# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [28]:
# Do all imports and installs here
import pandas as pd

import datetime as dt

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### Step 2: Explore and Assess the Data

#### Explore the Data
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### U.S. Demographics Data
-------

1. Loading the dataset into a Pandas df
2. Initial checks

In [116]:
demographics_df = pd.read_csv('us-cities-demographics.csv', sep=';')

In [117]:
demographics_df.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

In [118]:
demographics_df.shape

(2891, 12)

In [119]:
demographics_df.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

In [120]:
demographics_df.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


In [121]:
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [122]:
demographics_df['City'].nunique()

567

The demographics dataset contains data for 567 U.S. cities.

In [123]:
demographics_df['State'].nunique()

49

The demographics dataset contains data for 49 U.S. states.

**Checking for NaN values**

In [124]:
demographics_df.isnull().values.any()

True

In [125]:
demographics_df.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

There are 5 columns in the demographics dataset that have NaN values: Male Population, Female Population, Number of veterans, Foreigh-born, Average Household Size.

In [126]:
# Calculating the percentage of NaN values for each column

nan_demographics_df = pd.DataFrame(data=demographics_df.isnull().sum(), columns=['NaN'])

nan_demographics_df.drop(nan_demographics_df[nan_demographics_df['NaN'] == 0].index, inplace = True)

nan_demographics_df['% of NaN'] = (nan_demographics_df['NaN']/demographics_df.count())*100
nan_demographics_df

Unnamed: 0,NaN,% of NaN
Male Population,3,0.103878
Female Population,3,0.103878
Number of Veterans,13,0.451703
Foreign-born,13,0.451703
Average Household Size,16,0.556522


The % of NaN values in each column is negligible therefore, we will not drop any rows.

When filtering the dataframe for a specific city, you can see that the data for all columns except 'Race' & 'Count' is duplicated.

We will transform the table so that each Race category is it's own column.

In [127]:
demographics_df[demographics_df['City'] == 'Silver Spring']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
592,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,White,37756
1678,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Black or African-American,21330
2123,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,American Indian and Alaska Native,1084
2162,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Asian,8841


In [128]:
demographics_df['Race'].unique()

array(['Hispanic or Latino', 'White', 'Asian', 'Black or African-American',
       'American Indian and Alaska Native'], dtype=object)

In [129]:
demographics_df = demographics_df.set_index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race']).Count.unstack().reset_index()
demographics_df.columns.name = None
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,1813.0,2929.0,14449.0,33222.0,95487.0
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,1845.0,9033.0,66551.0,3684.0,129192.0
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,,10336.0,6577.0,34897.0,63666.0
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,1329.0,27984.0,7364.0,8265.0,44232.0
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,445.0,650.0,53440.0,1783.0,17160.0


Now, when we check the shape of the DataFrame, we can see that the duplicated rows have been removed.

In [130]:
demographics_df.shape

(596, 15)

The number of unique Cities and States is the same as before the dataset was transformed.

In [131]:
demographics_df['City'].nunique()

567

In [132]:
demographics_df['State'].nunique()

49

In [133]:
demographics_df.duplicated().sum()

0

In [134]:
demographics_df.isna().sum()

City                                  0
State                                 0
Median Age                            0
Male Population                       1
Female Population                     1
Total Population                      0
Number of Veterans                    7
Foreign-born                          7
Average Household Size                8
State Code                            0
American Indian and Alaska Native    57
Asian                                13
Black or African-American            12
Hispanic or Latino                    0
White                                 7
dtype: int64

In [135]:
# Calculating the percentage of NaN values for each column

nan_demographics_df_2 = pd.DataFrame(data=demographics_df.isnull().sum(), columns=['NaN'])

nan_demographics_df_2.drop(nan_demographics_df_2[nan_demographics_df_2['NaN'] == 0].index, inplace = True)

nan_demographics_df_2['% of NaN'] = (nan_demographics_df_2['NaN']/demographics_df.count())*100
nan_demographics_df_2

Unnamed: 0,NaN,% of NaN
Male Population,1,0.168067
Female Population,1,0.168067
Number of Veterans,7,1.188455
Foreign-born,7,1.188455
Average Household Size,8,1.360544
American Indian and Alaska Native,57,10.575139
Asian,13,2.229846
Black or African-American,12,2.054795
White,7,1.188455


In [136]:
# Removing name of index column
demographics_df = demographics_df.rename_axis(None, axis=1)

In [137]:
# Creating a new df
dems_df = demographics_df

In [138]:
dems_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,1813.0,2929.0,14449.0,33222.0,95487.0
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,1845.0,9033.0,66551.0,3684.0,129192.0
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,,10336.0,6577.0,34897.0,63666.0
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,1329.0,27984.0,7364.0,8265.0,44232.0
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,445.0,650.0,53440.0,1783.0,17160.0


**Calculations**

1. Finding the % breakdown of the following columns: Male Population, Female Population per City
2. Finding the % breakdwon of the following columns: American Indian and Alaska Native, Asian, Black or African-American, Hispanic or Latino and White per City
3. Finding the % of Foreign born per City

% breakdown per Gender

Let's add two new columns: 'pct_male_pop' and 'pct_female_pop'

In [139]:
dems_df['pct_male_pop'] = (dems_df['Male Population']/dems_df['Total Population'])*100
dems_df['pct_female_pop'] = (dems_df['Female Population']/dems_df['Total Population'])*100

In [140]:
dems_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White,pct_male_pop,pct_female_pop
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,1813.0,2929.0,14449.0,33222.0,95487.0,51.80654,48.19346
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,1845.0,9033.0,66551.0,3684.0,129192.0,49.043042,50.956958
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,,10336.0,6577.0,34897.0,63666.0,46.331394,53.668606
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,1329.0,27984.0,7364.0,8265.0,44232.0,48.015621,51.984379
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,445.0,650.0,53440.0,1783.0,17160.0,44.572417,55.427583


In [65]:
# gender_dems_df = demographics_df.groupby(['State'])['Male Population', 'Female Population', 'Total Population'].sum()
# gender_dems_df

% breakdown per Race category

In [141]:
# American Indian and Alaska Native
dems_df['pct_native'] = (dems_df['American Indian and Alaska Native']/dems_df['Total Population'])*100

# Asian
dems_df['pct_asian'] = (dems_df['Asian']/dems_df['Total Population'])*100

# Black or African-American
dems_df['pct_black'] = (dems_df['Black or African-American']/dems_df['Total Population'])*100

# Hispanic or Latino
dems_df['pct_hispanic'] = (dems_df['Hispanic or Latino']/dems_df['Total Population'])*100

# White
dems_df['pct_white'] = (dems_df['White']/dems_df['Total Population'])*100

In [142]:
dems_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,...,Black or African-American,Hispanic or Latino,White,pct_male_pop,pct_female_pop,pct_native,pct_asian,pct_black,pct_hispanic,pct_white
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,...,14449.0,33222.0,95487.0,51.80654,48.19346,1.440306,2.326893,11.478757,26.39264,75.857987
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,...,66551.0,3684.0,129192.0,49.043042,50.956958,0.933927,4.572444,33.687669,1.864816,65.396122
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,...,6577.0,34897.0,63666.0,46.331394,53.668606,,12.122349,7.713689,40.928176,74.669263
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,...,7364.0,8265.0,44232.0,48.015621,51.984379,1.690539,35.596713,9.367288,10.513395,56.264787
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,...,53440.0,1783.0,17160.0,44.572417,55.427583,0.6258,0.91409,75.152231,2.507418,24.131966


% breakdown of Foreign Born

In [143]:
dems_df['pct_foreign_born'] = (dems_df['Foreign-born']/dems_df['Total Population'])*100

% breakdown of Veterans

In [145]:
dems_df['pct_veterans'] = (dems_df['Number of Veterans']/dems_df['Total Population'])*100

In [146]:
dems_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,...,White,pct_male_pop,pct_female_pop,pct_native,pct_asian,pct_black,pct_hispanic,pct_white,pct_foreign_born,pct_veterans
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,...,95487.0,51.80654,48.19346,1.440306,2.326893,11.478757,26.39264,75.857987,6.457943,7.44145
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,...,129192.0,49.043042,50.956958,0.933927,4.572444,33.687669,1.864816,65.396122,5.074081,6.518757
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,...,63666.0,46.331394,53.668606,,12.122349,7.713689,40.928176,74.669263,18.57994,4.897729
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,...,44232.0,48.015621,51.984379,1.690539,35.596713,9.367288,10.513395,56.264787,23.966469,5.729259
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,...,17160.0,44.572417,55.427583,0.6258,0.91409,75.152231,2.507418,24.131966,1.210817,7.606632


In [147]:
dems_df.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code',
       'American Indian and Alaska Native', 'Asian',
       'Black or African-American', 'Hispanic or Latino', 'White',
       'pct_male_pop', 'pct_female_pop', 'pct_native', 'pct_asian',
       'pct_black', 'pct_hispanic', 'pct_white', 'pct_foreign_born',
       'pct_veterans'],
      dtype='object')

Dropping Columns

In [148]:
dems_df = dems_df.drop(['Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'American Indian and Alaska Native', 'Asian',
       'Black or African-American', 'Hispanic or Latino', 'White',], axis=1)
dems_df.columns

Index(['City', 'State', 'Median Age', 'Total Population',
       'Average Household Size', 'State Code', 'pct_male_pop',
       'pct_female_pop', 'pct_native', 'pct_asian', 'pct_black',
       'pct_hispanic', 'pct_white', 'pct_foreign_born', 'pct_veterans'],
      dtype='object')

Renaming columns

In [149]:
dems_df.rename(columns={'City': 'city', 'State': 'state', 'Median Age': 'median_age', 'Total Population': 'tot_pop', 'Average Household Size': 'avg_house_size'}, inplace=True)
dems_df.columns

Index(['city', 'state', 'median_age', 'tot_pop', 'avg_house_size',
       'State Code', 'pct_male_pop', 'pct_female_pop', 'pct_native',
       'pct_asian', 'pct_black', 'pct_hispanic', 'pct_white',
       'pct_foreign_born', 'pct_veterans'],
      dtype='object')

In [150]:
# Saving the clean df to a new csv
dems_df.to_csv('demographics_data_clean.csv', index=False)

In [151]:
print('Total records in df: ' + str(dems_df.shape[0]))

Total records in df: 596


### Immigration dataset
-----

N.B. The Immigration dataset is large therefore, for exploratory purposes only, a small subset of the Immigration dataset will be used below.

In [88]:
# Reading in the immigration data subset into a pandas df

immigration_df = pd.read_csv('immigration_data_sample.csv')

In [89]:
immigration_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [90]:
immigration_df.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [91]:
immigration_df.shape

(1000, 29)

**Reading in the whole Immigration dataset** 

We will be using Spark to work with the large Immigration dataset.

Spark has a library which supports the format that the Immigration dataset is in - .sas7bdat.


In [93]:
from pyspark.sql import SparkSession

In [94]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

*Reading in the Immigration data for April 2016 into Spark*

In [95]:
# Referenced from the following knowledge thread: https://knowledge.udacity.com/questions/437987
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df_spark = spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [96]:
immigration_df_spark.count()

3096313

We can confirm that the file has been read correctly.

In [97]:
immigration_df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [98]:
immigration_df_spark.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [None]:
# write to parquet
immigration_df_spark.write.parquet("sas_data")
immigration_df_spark=spark.read.parquet("sas_data")

### Airport Codes Data
----------

In [62]:
airport_codes_df = pd.read_csv('airport-codes_csv.csv')

In [63]:
airport_codes_df.shape

(55075, 12)

In [64]:
airport_codes_df.dtypes

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object

In [65]:
airport_codes_df.columns

Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'coordinates'],
      dtype='object')

In [66]:
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [67]:
airport_codes_df['iso_country'].nunique()

243

This dataset contains data for multiple countries. 

We will be dropping data for any country that is not the U.S. - this is because the Immigration dataset we will be using refers to immigration into the U.s. only.

In [68]:
# Filtering the dataset so we only have airport data for the U.S.

us_airports_df = airport_codes_df[airport_codes_df['iso_country'] == 'US']

In [69]:
us_airports_df.shape

(22757, 12)

Filtering the dataset for 'iso_country' == 'US' has reduced the number of rows by approximately half.

**Checking for NaN Values**

In [70]:
us_airports_df.isnull().values.any()

True

In [71]:
us_airports_df.isnull().sum()

ident               0
type                0
name                0
elevation_ft      239
continent       22756
iso_country         0
iso_region          0
municipality      102
gps_code         1773
iata_code       20738
local_code       1521
coordinates         0
dtype: int64

In [72]:
us_airports_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


**Checking for duplicates**

In [73]:
us_airports_df.duplicated().sum()

0

Dropping rows and columns

In [74]:
us_airports_df = us_airports_df.drop(['elevation_ft', 'continent', 'gps_code', 'local_code', 'coordinates'], axis=1)

In [75]:
us_airports_df.columns

Index(['ident', 'type', 'name', 'iso_country', 'iso_region', 'municipality',
       'iata_code'],
      dtype='object')

In [78]:
us_airports_df.head()

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,iata_code
440,07FA,small_airport,Ocean Reef Club Airport,US,US-FL,Key Largo,OCA
594,0AK,small_airport,Pilot Station Airport,US,US-AK,Pilot Station,PQS
673,0CO2,small_airport,Crested Butte Airpark,US,US-CO,Crested Butte,CSE
1088,0TE7,small_airport,LBJ Ranch Airport,US,US-TX,Johnson City,JCY
1402,13MA,small_airport,Metropolitan Airport,US,US-MA,Palmer,PMX


In [80]:
us_airport_df_final = us_airports_df.dropna(subset=["iata_code"])

In [81]:
us_airport_df_final.head()

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,iata_code
440,07FA,small_airport,Ocean Reef Club Airport,US,US-FL,Key Largo,OCA
594,0AK,small_airport,Pilot Station Airport,US,US-AK,Pilot Station,PQS
673,0CO2,small_airport,Crested Butte Airpark,US,US-CO,Crested Butte,CSE
1088,0TE7,small_airport,LBJ Ranch Airport,US,US-TX,Johnson City,JCY
1402,13MA,small_airport,Metropolitan Airport,US,US-MA,Palmer,PMX


In [82]:
us_airport_df_final.shape

(2019, 7)

Removing the missing iata_code data has reduced the number of rows to 2019

In [83]:
us_airport_df_final.isnull().sum()

ident           0
type            0
name            0
iso_country     0
iso_region      0
municipality    6
iata_code       0
dtype: int64

Renaming columns

In [84]:
us_airport_df_final.rename(columns={'type': 'airport_type', 'name': 'airport_name'}, inplace=True)
us_airport_df_final.columns

Index(['ident', 'airport_type', 'airport_name', 'iso_country', 'iso_region',
       'municipality', 'iata_code'],
      dtype='object')

In [85]:
# Saving to CSV
us_airport_df_final.to_csv("us_airports_clean.csv", index=False)

### Temperature Data
-------

1. Check dataset
2. Convert date to a datetime object
3. Drop rows with NaN values and check for duplicate rows
4. Drop unecessary columns
5. Filter for U.S data only
6. Rename columns
7. Add port code from the SAS text file

In [3]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = pd.read_csv(fname)

In [4]:
temp_df.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

In [5]:
temp_df.shape

(8599212, 7)

In [6]:
temp_df.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [7]:
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


**Converting date to a datetime object**

In [8]:
temp_df['dt'] = pd.to_datetime(temp_df['dt'])

In [9]:
temp_df['year'] = temp_df['dt'].dt.year

In [10]:
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E,1743
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E,1743
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E,1744
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E,1744
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E,1744


In [11]:
temp_df['year'].max()

2013

In [12]:
temp_df['year'].min()

1743

**Checking for NaN Values**

In [13]:
temp_df.isnull().values.any()

True

In [14]:
temp_df.isnull().sum()

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
year                                  0
dtype: int64

Only two columns have NaN values: AverageTemperature and AverageTemperatureUncertainty.

In [15]:
# Calculating the percentage of NaN values for each column

nan_temp_df = pd.DataFrame(data=temp_df.isnull().sum(), columns=['NaN'])

nan_temp_df.drop(nan_temp_df[nan_temp_df['NaN'] == 0].index, inplace = True)

nan_temp_df['% of NaN'] = (nan_temp_df['NaN']/temp_df.count())*100
nan_temp_df

Unnamed: 0,NaN,% of NaN
AverageTemperature,364130,4.421692
AverageTemperatureUncertainty,364130,4.421692


In [16]:
temp_df[temp_df['AverageTemperature'].isnull()]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E,1743
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E,1744
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E,1744
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E,1744
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E,1744
18,1745-05-01,,,Århus,Denmark,57.05N,10.33E,1745
19,1745-06-01,,,Århus,Denmark,57.05N,10.33E,1745
20,1745-07-01,,,Århus,Denmark,57.05N,10.33E,1745
21,1745-08-01,,,Århus,Denmark,57.05N,10.33E,1745
22,1745-09-01,,,Århus,Denmark,57.05N,10.33E,1745


We can go with the assumption that if a row has a NaN value in the 'AverageTemperature' column then it will also have a NaN value in the 'AverageTemperatureUncertainty'. So, we will drop all rows with NaN values.

In [17]:
temp_df.dropna(axis=0, inplace=True)

In [18]:
temp_df.shape

(8235082, 8)

In [19]:
temp_df.isna().sum()

dt                               0
AverageTemperature               0
AverageTemperatureUncertainty    0
City                             0
Country                          0
Latitude                         0
Longitude                        0
year                             0
dtype: int64

How many different Cities and Countries does this dataset contain?

In [20]:
temp_df['City'].nunique()

3448

The temperature dataset contains data for 3448 cities.

In [21]:
temp_df['Country'].nunique()

159

The temperature dataset contains data for 159 cities.

**Checking for duplicates**

In [22]:
temp_df.duplicated().sum()

0

There are no duplicate rows in this dataset.

*For this project, we will be focusing solely on the U.S therefore we will filter the dataset to only include data for U.S. cities.*

In [23]:
temp_df = temp_df[temp_df['Country'] == 'United States']

In [24]:
temp_df.shape

(661524, 8)

In [26]:
# Dropping 3 columns

temp_df = temp_df.drop(['AverageTemperatureUncertainty', 'Latitude', 'Longitude'], axis=1)
temp_df.columns

Index(['dt', 'AverageTemperature', 'City', 'Country', 'year'], dtype='object')

In [28]:
# Renaming the 'dt' column
temp_df.rename(columns={'dt': 'date'}, inplace=True)
temp_df.columns

Index(['date', 'AverageTemperature', 'City', 'Country', 'year'], dtype='object')

In [29]:
temp_df.rename(columns={'AverageTemperature': 'avg_temp', 'City': 'city', 'Country': 'country'}, inplace=True)
temp_df.columns

Index(['date', 'avg_temp', 'city', 'country', 'year'], dtype='object')

In [31]:
temp_df.head()

Unnamed: 0,date,avg_temp,city,country,year
47555,1820-01-01,2.101,Abilene,United States,1820
47556,1820-02-01,6.926,Abilene,United States,1820
47557,1820-03-01,10.767,Abilene,United States,1820
47558,1820-04-01,17.989,Abilene,United States,1820
47559,1820-05-01,21.809,Abilene,United States,1820


In [33]:
# Saving the clean df to a new csv
temp_df.to_csv('temp_data_clean.csv', index=False)

**TO DO !!!!**

In [None]:
# # Mapping the U.S cities to city port abbreviations (i94port from SAS label)

# # Add iport94 code based on city name
df_temperature_data = df_temperature_data.withColumn("i94port", get_i94port(df_temperature_data.City))

# Remove data points with no iport94 code
df_temperature_data = df_temperature_data.filter(df_temperature_data.i94port != 'null')

In [123]:
fname = 'temp_data_clean.csv'
temp_df = spark.read.format('csv').option('delimiter', ',').option('header', 'true').load(fname)

In [113]:
# Referenced from the following knowledge thread: https://knowledge.udacity.com/questions/125439
# Referenced from the following knowledge thread: https://knowledge.udacity.com/questions/801811

with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
    
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

i94_cit_and_res = code_mapper(f_content, "i94cntyl")
i94_port = code_mapper(f_content, "i94prtl")
i94_mode = code_mapper(f_content, "i94model")
i94_addr = code_mapper(f_content, "i94addrl")
i94_visa = {'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}

In [115]:
i94_port

{'ALC': 'ALCAN, AK',
 'ANC': 'ANCHORAGE, AK',
 'BAR': 'BAKER AAF - BAKER ISLAND, AK',
 'DAC': 'DALTONS CACHE, AK',
 'PIZ': 'DEW STATION PT LAY DEW, AK',
 'DTH': 'DUTCH HARBOR, AK',
 'EGL': 'EAGLE, AK',
 'FRB': 'FAIRBANKS, AK',
 'HOM': 'HOMER, AK',
 'HYD': 'HYDER, AK',
 'JUN': 'JUNEAU, AK',
 '5KE': 'KETCHIKAN, AK',
 'KET': 'KETCHIKAN, AK',
 'MOS': 'MOSES POINT INTERMEDIATE, AK',
 'NIK': 'NIKISKI, AK',
 'NOM': 'NOM, AK',
 'PKC': 'POKER CREEK, AK',
 'ORI': 'PORT LIONS SPB, AK',
 'SKA': 'SKAGWAY, AK',
 'SNP': 'ST. PAUL ISLAND, AK',
 'TKI': 'TOKEEN, AK',
 'WRA': 'WRANGELL, AK',
 'HSV': 'MADISON COUNTY - HUNTSVILLE, AL',
 'MOB': 'MOBILE, AL',
 'LIA': 'LITTLE ROCK, AR (BPS)',
 'ROG': 'ROGERS ARPT, AR',
 'DOU': 'DOUGLAS, AZ',
 'LUK': 'LUKEVILLE, AZ',
 'MAP': 'MARIPOSA AZ',
 'NAC': 'NACO, AZ',
 'NOG': 'NOGALES, AZ',
 'PHO': 'PHOENIX, AZ',
 'POR': 'PORTAL, AZ',
 'SLU': 'SAN LUIS, AZ',
 'SAS': 'SASABE, AZ',
 'TUC': 'TUCSON, AZ',
 'YUI': 'YUMA, AZ',
 'AND': 'ANDRADE, CA',
 'BUR': 'BURBANK, CA',
 '

Now, we can map the city code in the i94_port dictionary to the cities in the temperature dataset.

We will add a new column to the temperature dataset called 'i94_port' and add the corresponding data.

N.B. in Temperature Dataset, the cities in the 'City' column have the following format: Xxxx

However, in the i94_port dictionary, the cities are in the following format: XXXXXX

We will have to use the following .lower() to ensure that both formats are the same.

In [128]:
def city_port_code(city):

    for port in i94_port:
        if city.lower() in i94_port[port].lower():
            return port


temp_df = temp_df.withColumn('i94_port', city_port_code(temp_df.city))

TypeError: 'Column' object is not callable

In [127]:
temp_df.show()

+----------+------------------+-------+-------------+----+
|      date|          avg_temp|   city|      country|year|
+----------+------------------+-------+-------------+----+
|1820-01-01|2.1010000000000004|Abilene|United States|1820|
|1820-02-01|             6.926|Abilene|United States|1820|
|1820-03-01|10.767000000000001|Abilene|United States|1820|
|1820-04-01|17.988999999999994|Abilene|United States|1820|
|1820-05-01|            21.809|Abilene|United States|1820|
|1820-06-01|            25.682|Abilene|United States|1820|
|1820-07-01|            26.268|Abilene|United States|1820|
|1820-08-01|25.048000000000002|Abilene|United States|1820|
|1820-09-01|            22.435|Abilene|United States|1820|
|1820-10-01|             15.83|Abilene|United States|1820|
|1820-11-01|             9.408|Abilene|United States|1820|
|1820-12-01|             3.853|Abilene|United States|1820|
|1821-01-01| 5.276000000000002|Abilene|United States|1821|
|1821-02-01| 7.587999999999999|Abilene|United States|182

In [None]:
# Create udf to map city full name to city port abbreviation

@udf(StringType())
def city_to_port(city):
    for key in valid_ports:
        if city.lower() in valid_ports[key].lower():
            return key

# Clean temperature data

# Only use temperatures from United States
# Map full name to city port abbreviation
# Remove invalid ports
cleaned_temp_df = temperature_df.filter(temperature_df["Country"] == "United States") \
    .withColumn("year", year(temperature_df['dt'])) \
    .withColumn("month", month(temperature_df["dt"])) \
    .withColumn("i94port", city_to_port(temperature_df["City"])) \
    .withColumn("AverageTemperature", col("AverageTemperature").cast("float")) \
    .dropna(how='any', subset=["i94port"])






@udf()
def get_i94port(city):
    '''
    Input: City name 
    Output: Corresponding i94port
    '''
    
    for key in i94portvalid:
        if city.lower() in i94portvalid[key][0].lower():
            return key

        
        
        
# Add iport94 code based on city name
df_temperature_data = df_temperature_data.withColumn("i94port", get_i94port(df_temperature_data.City))
df_temperature_data.show()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.