# US Immigration And Its Contributing Factors
### Data Engineering Capstone Project

#### Project Summary
This project studies the relationship between the immigrants to US states and the population demographics, the temperature, and the airports.  
It uses STAR schema to create different dimension and fact tables based on the data sourced publicily.  
The processes developed in this Jupyter Notebook is based on premise tools.  
It uses sqlite to build a data warehouse and develops ETL scripts with Python and SQL. 

The project follows the following steps:
* [Step 1: Scope the Project and Gather Data](#step1)
* [Step 2: Explore and Assess the Data](#step2)
* [Step 3: Define the Data Model](#model)
* [Step 4: Run ETL to Model the Data](#check)
* [Step 5: Complete Project Write Up](#complete)

In [1]:
# !pip install --upgrade pandas

In [2]:
# !pip install --upgrade numpy

In [3]:
# !pip install --upgrade numexpr

In [4]:
# !pip install ipython-sql

In [9]:
#!pip install pyspark-pandas

In [2]:
# !pyspark --version

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np
from io import StringIO
import configparser
from sqlalchemy import create_engine
import os
import boto3 # aws Python API
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, date_add, expr, regexp_replace
import psycopg2

In [219]:
# engine = sqlalchemy.create_engine('postgres://username:password@url:5439/db_name')

In [2]:
# show all the columns
pd.set_option('display.max_columns', None)

In [3]:
# Run sql query
%load_ext sql

In [4]:
# get the parameter from aws configure file
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY =config.get('AWS', 'KEY')
SECRET =config.get('AWS', 'SECRET')

ARN = config.get('IAM_ROLE', 'ARN')

DWH_ENDPOINT = config.get("CLUSTER", "HOST")
DWH_DB = config.get("CLUSTER", "DB_NAME") 
DWH_DB_USER = config.get("CLUSTER", "DB_USER") 
DWH_DB_PASSWORD = config.get("CLUSTER", "DB_PASSWORD")
DWH_PORT = config.get("CLUSTER", "DB_PORT") 

In [5]:
# confire AWS credentials
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'KEY')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'SECRET')

In [24]:
# print(config.get('AWS', 'KEY'), config.get('AWS', 'SECRET'))

<a id='step1'></a>

### Step 1: Scope the Project and Gather Data

#### Scope 
1. **Purpose**: The relationship among immigrants, temperature, population, and airports.
2. **Data**: immigrants, temperature, population, and airports data.
3. **Solution**: dimension tables for temperature, population, airports and fact tables for tourists.
4. **Tools**: sqlite, Python, SQL
#### Describe and Gather Data 
- **Immigrants**: immigrants and their demographics; [Source](https://www.trade.gov/national-travel-and-tourism-office)
- **Temperature**: average temperature; [Source](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data?resource=download)
- **Population**: population and its demographics; [Source](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
- **Airports**: airport type and features; [Source](https://datahub.io/core/airport-codes#pandas)

<a id="step2"></a>

### Step 2: Explore and Assess the Data
#### Explore the Data 
[Identify data quality issues, like missing values, duplicate data, etc.](#identify)
- [airport](#airport)
- [immigration](#immigration)
- [cities](#city)
- [temperature](#temperature)
#### Cleaning Steps
[Document steps necessary to clean the data](#clean)

<a id="identify"></a>

#### 2.1 Explore the data ####

<a id="airport"></a>

[airport](https://datahub.io/core/airport-codes#pandas)

In [126]:
# Read data
# read data as "string" format to ensure all the information is correct
airport = pd.read_csv("../airport-codes_csv.csv", dtype="str")

In [6]:
# airport summary
airport.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null object
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: object(12)
memory usage: 5.0+ MB


**Notes**:
1. All types are "object". Need to be changed to the right formats.

In [7]:
# top 5
airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


**Notes**:
1. "iso_country" contains country and "iso_region" is combined by country and state.
2. "coordinates" needs to be separated by two columns "longitude" and "latitude".

In [8]:
# airport unique values 
airport.nunique()

ident           55075
type                7
name            52144
elevation_ft     5449
continent           6
iso_country       243
iso_region       2810
municipality    27133
gps_code        40850
iata_code        9042
local_code      27436
coordinates     54874
dtype: int64

In [9]:
# NaN rows
airport[airport.isnull().any(axis=1)]

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100,,US,US-OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810,,US,US-AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038,,US,US-CA,Barstow,00CA,,00CA,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87,,US,US-CA,Biggs,00CL,,00CL,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350,,US,US-CA,Pine Valley,00CN,,00CN,"-116.4597417, 32.7273736"


**Notes**:
1. 54397 rows out of 55075 contain NaN.

In [10]:
# NaN columns
airport.columns[airport.isna().any()]

Index(['elevation_ft', 'continent', 'iso_country', 'municipality', 'gps_code',
       'iata_code', 'local_code'],
      dtype='object')

**Notes**:
1. Some columns contain NaN values.

In [11]:
# mo duplicates
len(airport) == len(airport.drop_duplicates())

True

**Notes**:
1. There are no duplicates. 

<a id="immigration"></a>

[Immigration](https://www.trade.gov/national-travel-and-tourism-office)

Use PySpark to load the full dataset.

In [143]:
# configure PySpark
spark = (SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate())

In [144]:
# read the data 
immigration=spark.read.parquet("../sas_data/*.parquet")

In [8]:
# top 5
immigration.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

In [9]:
# data type
immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [65]:
# duplicates
immigration.drop_duplicates().count() == immigration.count() 

True

**Notes**:
1. "i94addr" contains "State" which could be a foreign key to link the above airport data.
2. Some columns contain "null" like "occup", "entdepu", "insnum".
3. Data formats need to be converted.
4. There are no duplicates.

<a id="city"></a>

[Cities](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

In [127]:
# cities
# read data as "string" format to ensure all the information is correct and assign delimiter ";"
cities = pd.read_csv("../us-cities-demographics.csv", delimiter=";", dtype="str")

In [16]:
# top 5
cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


**Notes**:
1. "State Code" can be used as a foreign key to join the above two tables.
2. Notice that "Race" and "Count" columns need to be transformed from long to wide so each race can be a separate column containing the population for that race.  

In [17]:
# data information
cities.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null object
Male Population           2888 non-null object
Female Population         2888 non-null object
Total Population          2891 non-null object
Number of Veterans        2878 non-null object
Foreign-born              2878 non-null object
Average Household Size    2875 non-null object
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null object
dtypes: object(12)
memory usage: 271.1+ KB


**Notes**:
1. All types are "object". Need to be changed to the right formats.

In [18]:
# unique information
cities.nunique()

City                       567
State                       49
Median Age                 180
Male Population            593
Female Population          594
Total Population           594
Number of Veterans         577
Foreign-born               587
Average Household Size     161
State Code                  49
Race                         5
Count                     2785
dtype: int64

In [19]:
# NaN rows
cities[cities.isnull().any(axis=1)]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
111,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,Hispanic or Latino,335559
155,Caguas,Puerto Rico,40.4,34743.0,42265.0,77008,,,,PR,Hispanic or Latino,76349
258,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,American Indian and Alaska Native,12143
333,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Black or African-American,331
637,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,Hispanic or Latino,139967
1437,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,White,72211
1747,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,American Indian and Alaska Native,4031
1748,Mayagüez,Puerto Rico,38.1,30799.0,35782.0,66581,,,,PR,Asian,235
1995,Ponce,Puerto Rico,40.5,56968.0,64615.0,121583,,,,PR,Hispanic or Latino,120705


**Notes**:
1. 16 rows out of 2891 contain NaN, which could be dropped.

In [20]:
# NaN columns
cities.columns[cities.isna().any()]

Index(['Male Population', 'Female Population', 'Number of Veterans',
       'Foreign-born', 'Average Household Size'],
      dtype='object')

**Notes**:
1. Some columns contain NaN values.

In [21]:
# duplicates
len(cities) == len(cities.drop_duplicates())

True

**Notes**:
1. There are no duplicates.

<a id="temperature"></a>

[Temperature](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data?resource=download)

In [128]:
# temperature
# read data as "string" format to ensure all the information is correct
temperature = pd.read_csv("../GlobalLandTemperaturesByState.csv", dtype="str")

In [23]:
# top 5
temperature.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
0,1855-05-01,25.544,1.171,Acre,Brazil
1,1855-06-01,24.228,1.103,Acre,Brazil
2,1855-07-01,24.371,1.044,Acre,Brazil
3,1855-08-01,25.427,1.073,Acre,Brazil
4,1855-09-01,25.675,1.014,Acre,Brazil


**Notes**:
1. "State" contain the names of the states in each country, which could work as foreign key to join the above "cities" table and then get the "State Code" to join the other tables.

In [24]:
# data information
temperature.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 645675 entries, 0 to 645674
Data columns (total 5 columns):
dt                               645675 non-null object
AverageTemperature               620027 non-null object
AverageTemperatureUncertainty    620027 non-null object
State                            645675 non-null object
Country                          645675 non-null object
dtypes: object(5)
memory usage: 24.6+ MB


**Notes**:
1. All types are "object". Need to be changed to the right formats.

In [25]:
# statistics
temperature.describe()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
count,645675,620027.0,620027.0,645675,645675
unique,3239,100924.0,8040.0,241,7
top,1956-12-01,18.402,0.255,Florida,Russia
freq,241,36.0,1042.0,3239,254972


In [26]:
# unique values
temperature.nunique()

dt                                 3239
AverageTemperature               100924
AverageTemperatureUncertainty      8040
State                               241
Country                               7
dtype: int64

In [27]:
# NaN rows
temperature[temperature.isnull().any(axis=1)]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
14,1856-07-01,,,Acre,Brazil
20,1857-01-01,,,Acre,Brazil
21,1857-02-01,,,Acre,Brazil
22,1857-03-01,,,Acre,Brazil
23,1857-04-01,,,Acre,Brazil
24,1857-05-01,,,Acre,Brazil
25,1857-06-01,,,Acre,Brazil
26,1857-07-01,,,Acre,Brazil
27,1857-08-01,,,Acre,Brazil
28,1857-09-01,,,Acre,Brazil


**Notes**:
1. 25648 rows out of 645675 contain NaN.

In [28]:
# NaN columns
temperature.columns[temperature.isna().any()]

Index(['AverageTemperature', 'AverageTemperatureUncertainty'], dtype='object')

**Notes**:
1. Some columns contain NaN values.

In [29]:
# duplicates
len(temperature) == len(temperature.drop_duplicates())

True

**Notes**:
1. There are no duplicates.

<a id="clean"></a>

#### 2.2 Clean Steps

##### airport
1. to numeric "elevation_ft"
2. Separate "iso_region" to two columns  
3. Separate "coordinates" to two columns and round
4. Drop other airport type only keeping "airport" & not closed
5. Only include US

In [129]:
# US airport 
airportUS = (airport.loc[(airport.iso_country == "US") 
                         & ~(airport.type == "closed") 
                         & airport.type.str.contains("airport")]) 

In [130]:
# change "elevation_ft" to numeric
airportUS["elevation_ft"] = airportUS["elevation_ft"].astype(float)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [131]:
# separate "iso_region" to "country" and "region"
airportUS[["country", "region"]] = airportUS["iso_region"].str.split("-", 1, expand = True)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self[k1] = value[k2]


In [132]:
# separate "iso_region" to "country" and "region" and round 4
airportUS[["longitude", "latitude"]] = airportUS["coordinates"].str.split(",", 1, expand = True).astype(float).round(4)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self[k1] = value[k2]


In [133]:
# drop unnecessary columns 
airportUSNew = (airportUS.drop(["iso_country", "iso_region", "coordinates", "country", 
                                "continent","iata_code", "gps_code", "local_code", "municipality"], 
                               axis=1))

In [35]:
# NaN rows
airportUSNew[airportUSNew.isnull().any(axis=1)]

Unnamed: 0,ident,type,name,elevation_ft,region,longitude,latitude
6875,63CA,small_airport,Desert Air Sky Ranch Airport,,CA,-115.8740,33.4811
6981,65LA,small_airport,Southern Seaplane Airport,,LA,-90.0222,29.8661
7653,6XA4,small_airport,Zadow Airstrip,,TX,-95.9544,29.9917
9006,8AL3,small_airport,Fricks Field Airport,,AL,-86.0886,34.1418
9910,9CL9,small_airport,Spezia Airport,,CA,-121.5340,38.2166
12375,BOBS,small_airport,Bobs,,AR,-91.4437,35.3915
12378,BONI,small_airport,Bonita,,LA,-91.7114,32.8944
12788,BUTL,small_airport,Butlerville Field,,AR,-91.8312,34.9697
14544,CHAN,small_airport,A & C AG Aviation Inc,,GA,-83.6929,32.1711
15740,CRAI,small_airport,Craig's field,,LA,-91.9192,30.4962


In [36]:
# len(airportUSNew) ## 14582 

Only 63 rows out of 14582 rows contain NaN, which could be dropped.

In [134]:
# drop NaN
airportUSNewNoNa = airportUSNew.dropna()

In [38]:
# NaN rows
#airportUSNewNoNa[airportUSNewNoNa.isnull().any(axis=1)]

In [39]:
# duplicates
len(airportUSNewNoNa) == len(airportUSNewNoNa.drop_duplicates())

True

In [40]:
# data information
airportUSNewNoNa.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 14519 entries, 1 to 54896
Data columns (total 7 columns):
ident           14519 non-null object
type            14519 non-null object
name            14519 non-null object
elevation_ft    14519 non-null float64
region          14519 non-null object
longitude       14519 non-null float64
latitude        14519 non-null float64
dtypes: float64(3), object(4)
memory usage: 907.4+ KB


In [41]:
# top 5
airportUSNewNoNa.head()

Unnamed: 0,ident,type,name,elevation_ft,region,longitude,latitude
1,00AA,small_airport,Aero B Ranch Airport,3435.0,KS,-101.4739,38.704
2,00AK,small_airport,Lowell Field,450.0,AK,-151.696,59.9492
3,00AL,small_airport,Epps Airpark,820.0,AL,-86.7703,34.8648
5,00AS,small_airport,Fulton Airport,1100.0,OK,-97.818,34.9428
6,00AZ,small_airport,Cordes Airport,3810.0,AZ,-112.165,34.3056


In [63]:
# save it as a csv and remove header and index and move it to aws s3 
airportUSNewNoNa.to_csv("airportUS.csv", sep=",", header=False, index=False, encoding="utf-8")
s3 = boto3.resource('s3')
s3.meta.client.upload_file('airportUS.csv', 'udacity-capstone-project-bucket-test', 'airportUS.csv')

##### cities
1. Transform columns "Race" and "Count" from long to wide
2. To numeric: "Male Population", "Female Population", "Total Population", "Number of Veterans", "Foreign-born", "Count"
3. To float: "Median Age", "Average Household Size"

In [18]:
# NaN rows
cities[cities.isnull().any(axis=1)]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
111,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,Hispanic or Latino,335559
155,Caguas,Puerto Rico,40.4,34743.0,42265.0,77008,,,,PR,Hispanic or Latino,76349
258,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,American Indian and Alaska Native,12143
333,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Hispanic or Latino,1066
449,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Black or African-American,331
637,Carolina,Puerto Rico,42.0,64758.0,77308.0,142066,,,,PR,Hispanic or Latino,139967
1437,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,White,72211
1747,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,American Indian and Alaska Native,4031
1748,Mayagüez,Puerto Rico,38.1,30799.0,35782.0,66581,,,,PR,Asian,235
1995,Ponce,Puerto Rico,40.5,56968.0,64615.0,121583,,,,PR,Hispanic or Latino,120705


In [19]:
# len(cities[cities.isnull().any(axis=1)]) ## 16 rows are blank

In [20]:
# len(cities) ## 2891

16 rows out of 2891 are blank. And the status are "Puerto Rico" and "Florida (The Villages)". Thus, dropped these places.

In [135]:
# drop NaN
citiesNoNa = cities.dropna()

In [136]:
# change "Count" to "float" before converting, otherwise showing errors
citiesNoNa["Count"] = citiesNoNa["Count"].astype(float)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [137]:
# transform from long to wide
citiesPop = citiesNoNa[['City', 'State', 'Median Age', 'Male Population',
       'Female Population', 'Total Population', 'Number of Veterans',
       'Foreign-born', 'Average Household Size', 'State Code']]
citiesRace = (pd.pivot_table(citiesNoNa[["City", "Race", "Count"]], 
                             index="City", columns = "Race", values= "Count")
                          .reset_index().reset_index(drop=True).rename_axis(None, axis=1))
citiesNew = citiesPop.merge(citiesRace, on="City", how="left")

In [24]:
# citiesNew.head()

In [138]:
# to integer 
citiesNew[["Male Population", "Female Population", 
           "Total Population", "Number of Veterans", "Foreign-born"]] = (citiesNew[["Male Population", "Female Population", 
                                                        "Total Population", "Number of Veterans", "Foreign-born"
                                                        ]].astype(int))

In [139]:
# to float since some rows contain NaN
# show errors if converting to int
citiesNew[["American Indian and Alaska Native", "Asian", 
           "Black or African-American", "Hispanic or Latino", "White"]] = (citiesNew[["American Indian and Alaska Native", "Asian", 
                                                                                      "Black or African-American", "Hispanic or Latino", "White"]].astype(float))

In [140]:
# to float
citiesNew[["Median Age", "Average Household Size"]] = (citiesNew[["Median Age", "Average Household Size"]].astype(float))

In [28]:
# NaN rows
citiesNew[citiesNew.isnull().any(axis=1)]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,,4759.0,18191.0,3430.0,61869.0
15,Fort Myers,Florida,37.3,36850,37165,74015,4312,15365,2.45,FL,,3519.0,17298.0,17866.0,50169.0
35,Miami Gardens,Florida,34.9,50719,62480,113199,2327,33394,3.75,FL,,,85300.0,23287.0,27273.0
38,North Little Rock,Arkansas,33.6,31671,34835,66506,4130,2787,2.62,AR,713.0,,30766.0,4860.0,34118.0
44,New Britain,Connecticut,33.4,37350,35459,72809,2219,15080,2.52,CT,,2297.0,9873.0,30188.0,56431.0
49,Camden,New Jersey,27.9,36437,39694,76131,1425,11317,3.00,NJ,,1686.0,35237.0,41473.0,19907.0
71,Lynwood,California,29.4,35634,36371,72005,776,28061,4.43,CA,,994.0,5346.0,63377.0,48670.0
134,Alafaya,Florida,33.5,39504,45760,85264,4176,15842,2.94,FL,,10336.0,6577.0,34897.0,63666.0
138,Miami Beach,Florida,42.5,48090,44221,92311,2265,49908,2.10,FL,,2772.0,4013.0,47446.0,72180.0
156,Champaign,Illinois,28.7,43326,42760,86086,3734,12261,2.25,IL,,11746.0,16163.0,4015.0,58099.0


In [29]:
# len(citiesNew) ## 2875

212 rows out of 2875 contain NaN which are related to "American Indian and Alaska Native". Thus, don't dropped.

In [30]:
# duplicates
len(citiesNew) == len(citiesNew.drop_duplicates())

False

In [31]:
#len(citiesNew.drop_duplicates()) ## 588

In [141]:
# drop duplicates
citiesNewNoDup = citiesNew.drop_duplicates()

In [33]:
# data information
citiesNewNoDup.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 588 entries, 0 to 2711
Data columns (total 15 columns):
City                                 588 non-null object
State                                588 non-null object
Median Age                           588 non-null float64
Male Population                      588 non-null int64
Female Population                    588 non-null int64
Total Population                     588 non-null int64
Number of Veterans                   588 non-null int64
Foreign-born                         588 non-null int64
Average Household Size               588 non-null float64
State Code                           588 non-null object
American Indian and Alaska Native    537 non-null float64
Asian                                581 non-null float64
Black or African-American            583 non-null float64
Hispanic or Latino                   588 non-null float64
White                                588 non-null float64
dtypes: float64(7), int64(5), object(

In [62]:
# top 5
citiesNewNoDup.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,1084.0,8841.0,21330.0,25924.0,37756.0
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,351.0,30473.0,3917.0,2566.0,58723.0
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,,4759.0,18191.0,3430.0,61869.0
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,2789.0,24519.0,24437.0,65823.0,111832.0
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,2268.0,7349.0,144961.0,100432.0,76402.0


In [64]:
# save it as a csv and remove header and index and move it to aws s3 
citiesNewNoDup.to_csv("citiesUS.csv", sep=",", header=False, index=False, encoding="utf-8")
s3 = boto3.resource('s3')
s3.meta.client.upload_file('citiesUS.csv', 'udacity-capstone-project-bucket-test', 'citiesUS.csv')

##### immigration
1. to date "arrdate", "depdate", "dtadfile"
2. to text "cicid", "i94cit", "i94res", "admnum"
3. to integer "i94yr", "i94mon", "i94mode", "i94bir", "	i94visa", "count"

Most of the NaN values are existent in "visapost", "occup", "entdepu", "insnum" columns which are not necessary for data analysis.

In [145]:
# drop unnecessary columns
immigrationNew = immigration.drop("visapost", "occup", "entdepu", "insnum")

In [66]:
# immigrationNew.show(5)

In [10]:
# 13 rows out of 1000 contain "D/S" value for "dtaddto"
# needs to be dropped, then can change to date format
immigrationNew.filter(immigrationNew.dtaddto == "D/S").show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-------+-------+-------+-------+-------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|entdepa|entdepd|matflag|biryear|dtaddto|gender|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-------+-------+-------+-------+-------+------+-------+--------------+-----+--------+
|5748903.0|2016.0|   4.0| 249.0| 111.0|    SFR|20574.0|    1.0|     CA|20625.0|  19.0|    3.0|  1.0|20160430|      G|      O|      M| 1997.0|    D/S|     F|     AF|9.499984463E10|00084|      F1|
|5748931.0|2016.0|   4.0| 249.0| 249.0|    DET|20574.0|    1.0|     MI|20627.0|  23.0|    3.0|  1.0|20160430|      G|      O|      M| 1993.0|    D/S|     M|     DL|9.500494123E10|00019|      F1|
|5748948.0|2016.0|   4.0|

In [146]:
# drop "dtaddto" equal to "D/S" 
# then can change "dtaddto" to date format since it contain "NaN"
immigrationPlus = immigrationNew.filter(immigrationNew.dtaddto != "D/S")

In [12]:
# top 5
immigrationPlus.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-------+-------+-------+-------+--------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|entdepa|entdepd|matflag|biryear| dtaddto|gender|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+-------+-------+-------+-------+--------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|      G|      O|      M| 1976.0|10292016|     F|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  1.0|20160430|      G|      O|      M| 1984.0|10292016|     F|     VA|9.495562283E10|00007|      B1|
|5748519.0|2016.0|  

In [70]:
# schema
immigrationPlus.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



Some data formats need to be changed.

In [147]:
# change to date
immigrationPlus = (immigrationPlus.withColumn("arrdate", expr("date_add('1960-01-01', arrdate)")).
                                    withColumn("depdate", expr("date_add('1960-01-01', depdate)")).
                                    withColumn("dtadfile", expr("to_date(dtadfile,'yyyyMMdd')")).
                                    withColumn("dtaddto", expr("to_date(dtaddto,'MMddyyyy')"))
                  )

In [14]:
# immigrationPlus.show(5)

In [148]:
# change to int
immigrationPlus = (immigrationPlus.withColumn("i94yr", col("i94yr").cast("float").cast("integer")).
                                     withColumn("i94mon", col("i94mon").cast("float").cast("integer")).
                                     withColumn("i94mode", col("i94mode").cast("float").cast("integer")).
                                     withColumn("i94bir", col("i94bir").cast("float").cast("integer")).
                                     withColumn("i94visa", col("i94visa").cast("float").cast("integer")).
                                     withColumn("count", col("count").cast("float").cast("integer")).
                                     withColumn("biryear", col("biryear").cast("float").cast("integer"))
                    )

In [149]:
# change to text
immigrationPlus = (immigrationPlus.withColumn("cicid", regexp_replace(col('cicid').cast("string"), '\.0', '')).
                    withColumn("i94cit", regexp_replace(col('i94cit').cast("string"), '\.0', '')).
                    withColumn("i94res", regexp_replace(col('i94res').cast("string"), '\.0', '')).
                    withColumn("admnum", col('admnum').cast("bigint").cast("string")))

In [89]:
# duplicates 
immigrationPlus.count() == immigrationPlus.drop_duplicates().count()

True

In [74]:
# data information
immigrationPlus.printSchema()

root
 |-- cicid: string (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: date (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [75]:
# top 5
immigrationPlus.show(5)

+-------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+-------+-------+-------+-------+----------+------+-------+-----------+-----+--------+
|  cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|  dtadfile|entdepa|entdepd|matflag|biryear|   dtaddto|gender|airline|     admnum|fltno|visatype|
+-------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+-------+-------+-------+-------+----------+------+-------+-----------+-----+--------+
|5748517| 2016|     4|   245|   438|    LOS|2016-04-30|      1|     CA|2016-05-08|    40|      1|    1|2016-04-30|      G|      O|      M|   1976|2016-10-29|     F|     QF|94953870030|00011|      B1|
|5748518| 2016|     4|   245|   438|    LOS|2016-04-30|      1|     NV|2016-05-17|    32|      1|    1|2016-04-30|      G|      O|      M|   1984|2016-10-29|     F|     VA|94955622830|00007|      B1|


In [55]:
# immigrationPlus.select("i94yr").distinct().show() ## all the data are in 2016

In [76]:
# "i94addr" contain 458 distinct values
immigrationPlus.select("i94addr").distinct().count()

458

In [34]:
# there are only 48 "state codes" in cities
len(sorted(citiesNewNoDup["State Code"].drop_duplicates()))

48

In [150]:
# only select these records with "i94addr" in the cities
usStates = sorted(citiesNewNoDup["State Code"].drop_duplicates())

In [151]:
# filter the "i94addr" in cities "State Code" 
immigrationPlusNew = immigrationPlus.filter(immigrationPlus.i94addr.isin(usStates))

In [80]:
# "i94addr" distinct values
immigrationPlusNew.select("i94addr").distinct().count()

48

In [81]:
# check
immigrationPlusNew.select("i94addr").distinct().show(50)

+-------+
|i94addr|
+-------+
|     SC|
|     AZ|
|     LA|
|     MN|
|     NJ|
|     DC|
|     OR|
|     VA|
|     RI|
|     KY|
|     NH|
|     MI|
|     NV|
|     WI|
|     ID|
|     CA|
|     CT|
|     NE|
|     MT|
|     NC|
|     MD|
|     DE|
|     MO|
|     IL|
|     ME|
|     WA|
|     ND|
|     MS|
|     AL|
|     IN|
|     OH|
|     TN|
|     NM|
|     IA|
|     PA|
|     SD|
|     NY|
|     TX|
|     GA|
|     MA|
|     KS|
|     FL|
|     CO|
|     AK|
|     AR|
|     OK|
|     UT|
|     HI|
+-------+



In [58]:
# immigrationPlusNew.count()

In [57]:
## save the data to a parquet file in s3
(immigrationPlusNew.write
    .mode("overwrite")
    .parquet("s3a://udacity-capstone-project-bucket-test/immigration.parquet"))

##### temperature
1. Only include 2003 onwards of temperature for data usage
2. Only include US

In [78]:
# year range
# print(min(temperature.dt), max(temperature.dt)) ## 1743-11-01, 2013-09-01

In [85]:
# temperature in 2003 and onwards and in US
temperatureUS10Yr = temperature[(temperature.Country == "United States") 
            & (temperature.dt >= "2003-01-01")].drop("Country", axis=1)

In [86]:
# Uncertainty
#print(min(temperatureUS10Yr.AverageTemperatureUncertainty), max(temperatureUS10Yr.AverageTemperatureUncertainty))
## 0.049 1.3219999999999998

In [87]:
# NaN rows
temperatureUS10Yr[temperatureUS10Yr.isnull().any(axis=1)]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State
15106,2013-09-01,,,Alaska
154725,2013-09-01,,,Hawaii


In [88]:
# len(temperatureUS10Yr) ##6579

2 out of 6579 contains NaN. Thus. drop.

In [89]:
# drop NaN
temperatureUS10YrNoNa = temperatureUS10Yr.dropna() 

In [90]:
# change to date
temperatureUS10YrNoNa["dt"] = pd.to_datetime(temperatureUS10YrNoNa["dt"], format="%Y-%m-%d")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


In [91]:
# to float and round to 1
temperatureUS10YrNoNa[["AverageTemperature", "AverageTemperatureUncertainty"]] = temperatureUS10YrNoNa[["AverageTemperature", "AverageTemperatureUncertainty"]].astype(float).round(1)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self[k1] = value[k2]


In [92]:
# duplicates
len(temperatureUS10YrNoNa) == len(temperatureUS10YrNoNa.drop_duplicates())

True

In [93]:
# data information
temperatureUS10YrNoNa.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 6577 entries, 10568 to 626439
Data columns (total 4 columns):
dt                               6577 non-null datetime64[ns]
AverageTemperature               6577 non-null float64
AverageTemperatureUncertainty    6577 non-null float64
State                            6577 non-null object
dtypes: datetime64[ns](1), float64(2), object(1)
memory usage: 256.9+ KB


In [88]:
# top 5
temperatureUS10YrNoNa.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State
10568,2003-01-01,4.8,0.1,Alabama
10569,2003-02-01,9.1,0.2,Alabama
10570,2003-03-01,14.6,0.2,Alabama
10571,2003-04-01,17.8,0.2,Alabama
10572,2003-05-01,22.6,0.2,Alabama


In [94]:
# save it as a csv and remove header and index and move it to aws s3 
temperatureUS10YrNoNa.to_csv("temperatureUS.csv", sep=",", header=False, index=False, encoding="utf-8")
s3 = boto3.resource('s3')
s3.meta.client.upload_file('temperatureUS.csv', 'udacity-capstone-project-bucket-test', 'temperatureUS.csv')

<a id="model"></a>

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
* STAR Schema
* dimension tables
    * dim_airports ("State" PK; "num_airports")
    * dim_temperature ("State", PK; "min_tm"; "max_tm")
    * dim_population ("State", PK; age, gender, race population, household, etc)
* fact tables
    * fact_tourists ("cicid", PK; "State", FK)  <br>

#### 3.2 Mapping Out Data Pipelines

<!-- ![title](airport_data_dictionary.png) -->

##### Relationship between tables 

"State Code" in "cities", "region" in "airports", and "i94addr" in "immigration" are all related to US state codes, which can be a key to join all the three tables. And "State" in "temperature" and "State" in "cities" are all related to US state names, which can be a key to join the two tables, then adding "State Code" to "temperature".   
Use "State Code" to link all the tables together to create relevant dimension and fact tables.

##### State Codes

In [152]:
# airport region
np.array(sorted(airportUSNewNoNa.region.unique()))

array(['AK', 'AL', 'AR', 'AZ', 'CA', 'CO', 'CT', 'DC', 'DE', 'FL', 'GA',
       'HI', 'IA', 'ID', 'IL', 'IN', 'KS', 'KY', 'LA', 'MA', 'MD', 'ME',
       'MI', 'MN', 'MO', 'MS', 'MT', 'NC', 'ND', 'NE', 'NH', 'NJ', 'NM',
       'NV', 'NY', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX',
       'U-A', 'UT', 'VA', 'VT', 'WA', 'WI', 'WV', 'WY'], 
      dtype='<U3')

In [153]:
# city state
np.array(sorted(citiesNewNoDup["State Code"].unique()))

array(['AK', 'AL', 'AR', 'AZ', 'CA', 'CO', 'CT', 'DC', 'DE', 'FL', 'GA',
       'HI', 'IA', 'ID', 'IL', 'IN', 'KS', 'KY', 'LA', 'MA', 'MD', 'ME',
       'MI', 'MN', 'MO', 'MS', 'MT', 'NC', 'ND', 'NE', 'NH', 'NJ', 'NM',
       'NV', 'NY', 'OH', 'OK', 'OR', 'PA', 'RI', 'SC', 'SD', 'TN', 'TX',
       'UT', 'VA', 'WA', 'WI'], 
      dtype='<U2')

In [154]:
# compare state codes in cities and airports
set(np.array(sorted(citiesNewNoDup["State Code"].unique()))).difference(set(np.array(sorted(airportUSNewNoNa.region.unique()))))

set()

In [155]:
# compare state codes in cities and airports
set(np.array(sorted(airportUSNewNoNa.region.unique()))).difference(set(np.array(sorted(citiesNewNoDup["State Code"].unique()))))

{'U-A', 'VT', 'WV', 'WY'}

1. All the state codes in "cities" are in "airport".  
2. {'U-A', 'VT', 'WV', 'WY'} are in "airport" but not in "cities".

In [159]:
# immigration i94addr
immigrationStates = immigrationPlusNew.select("i94addr").distinct().toPandas()

In [160]:
# top 5
immigrationStates.head()

Unnamed: 0,i94addr
0,SC
1,AZ
2,LA
3,MN
4,NJ


In [161]:
# compare state codes in cities and immigration
set(np.array(sorted(citiesNewNoDup["State Code"].unique()))).difference(set(np.array(sorted(immigrationStates["i94addr"].astype("str").unique()))))

set()

In [162]:
# compare state codes in cities and immigration
set(np.array(sorted(immigrationStates["i94addr"].astype("str").unique()))).difference(set(np.array(sorted(citiesNewNoDup["State Code"].unique()))))

set()

1. All the state codes in the "cities" are in "immigration".

##### State Names

In [96]:
# state names in "cities" 
np.array(sorted(citiesNewNoDup.State.unique()))

array(['Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California',
       'Colorado', 'Connecticut', 'Delaware', 'District of Columbia',
       'Florida', 'Georgia', 'Hawaii', 'Idaho', 'Illinois', 'Indiana',
       'Iowa', 'Kansas', 'Kentucky', 'Louisiana', 'Maine', 'Maryland',
       'Massachusetts', 'Michigan', 'Minnesota', 'Mississippi',
       'Missouri', 'Montana', 'Nebraska', 'Nevada', 'New Hampshire',
       'New Jersey', 'New Mexico', 'New York', 'North Carolina',
       'North Dakota', 'Ohio', 'Oklahoma', 'Oregon', 'Pennsylvania',
       'Rhode Island', 'South Carolina', 'South Dakota', 'Tennessee',
       'Texas', 'Utah', 'Virginia', 'Washington', 'Wisconsin'],
      dtype='<U20')

In [97]:
# state names in "temperature" 
np.array(sorted(temperatureUS10YrNoNa.State.unique()))

array(['Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California',
       'Colorado', 'Connecticut', 'Delaware', 'District Of Columbia',
       'Florida', 'Georgia (State)', 'Hawaii', 'Idaho', 'Illinois',
       'Indiana', 'Iowa', 'Kansas', 'Kentucky', 'Louisiana', 'Maine',
       'Maryland', 'Massachusetts', 'Michigan', 'Minnesota',
       'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada',
       'New Hampshire', 'New Jersey', 'New Mexico', 'New York',
       'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma', 'Oregon',
       'Pennsylvania', 'Rhode Island', 'South Carolina', 'South Dakota',
       'Tennessee', 'Texas', 'Utah', 'Vermont', 'Virginia', 'Washington',
       'West Virginia', 'Wisconsin', 'Wyoming'], dtype='<U20')

In [98]:
# compare state names in "cities" and "temperature"
set(np.array(sorted(citiesNewNoDup.State.unique()))).difference(set(np.array(sorted(temperatureUS10YrNoNa.State.unique()))))

{'District of Columbia', 'Georgia'}

In [99]:
# compare state names in "cities" and "temperature"
set(np.array(sorted(temperatureUS10YrNoNa.State.unique()))).difference(set(np.array(sorted(citiesNewNoDup.State.unique()))))

{'District Of Columbia',
 'Georgia (State)',
 'Vermont',
 'West Virginia',
 'Wyoming'}

1. Change "District Of Columbia" to "District of Columbia" and "Georgia (State)" to "Georgia".
2. {'Vermont', 'West Virginia','Wyoming'} are in "temperature" not in "cities".

##### Staging tables

**Redshift Data Warehouse**

##### airport

In [40]:
# redshift connection
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
#print(conn_string)
%sql $conn_string

'Connected: udacity-data-pipelines@dev'

In [91]:
%%sql
DROP TABLE IF EXISTS airports;
CREATE TABLE IF NOT EXISTS airports
               (airport_id INT IDENTITY(0, 1) PRIMARY KEY,
               ident VARCHAR NOT NULL, 
               type VARCHAR NOT NULL, 
               name VARCHAR NOT NULL,
               elevation_ft FLOAT4 NOT NULL, 
               region VARCHAR NOT NULL,
               longitude FLOAT8 NOT NULL,
               latitude FLOAT8 NOT NULL);

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.
Done.


[]

In [92]:
query = """COPY airports (ident, type, name, elevation_ft, region, longitude, latitude)
FROM 's3://udacity-capstone-project-bucket-test/airportUS.csv'
credentials 'aws_access_key_id={};aws_secret_access_key={}' CSV;""".format(KEY, SECRET)
%sql $query

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.


[]

In [93]:
%%sql
SELECT * FROM airports LIMIT 5;

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


airport_id,ident,type,name,elevation_ft,region,longitude,latitude
2,00AK,small_airport,Lowell Field,450.0,AK,-151.696,59.9492
4,00AL,small_airport,Epps Airpark,820.0,AL,-86.7703,34.8648
10,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,CA,-116.888,35.3505
16,00FL,small_airport,River Oak Airport,35.0,FL,-80.9692,27.2309
18,00GA,small_airport,Lt World Airport,700.0,GA,-84.0683,33.7675


##### Cities

In [94]:
%%sql
DROP TABLE IF EXISTS cities;
CREATE TABLE IF NOT EXISTS cities 
               (city_id INT IDENTITY(0, 1) PRIMARY KEY,
               City VARCHAR NOT NULL, 
               State VARCHAR NOT NULL, 
               "Median Age" FLOAT4 NOT NULL,
               "Male Population" INT NOT NULL, 
               "Female Population" INT NOT NULL,
               "Total Population" INT NOT NULL,
               "Number of Veterans" INT NOT NULL,
               "Foreign-born" INT NOT NULL,
               "Average Household Size" FLOAT4 NOT NULL,
               "State Code" VARCHAR NOT NULL,
               "American Indian and Alaska Native" FLOAT4,
               "Asian" FLOAT4,
               "Black or African-American" FLOAT4,
               "Hispanic or Latino" FLOAT4 NOT NULL,
               "White" FLOAT4 NOT NULL);

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.
Done.


[]

In [95]:
query = """COPY cities 
FROM 's3://udacity-capstone-project-bucket-test/citiesUS.csv'
credentials 'aws_access_key_id={};aws_secret_access_key={}'
CSV;""".format(KEY, SECRET)
%sql $query

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.


[]

In [49]:
# %%sql
# SELECT * FROM stl_load_errors;

In [96]:
%%sql
SELECT * FROM cities LIMIT 5;

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


airport_id,city,state,median age,male population,female population,total population,number of veterans,foreign-born,average household size,state code,american indian and alaska native,asian,black or african-american,hispanic or latino,white
1,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,1084.0,8841.0,21330.0,25924.0,37756.0
3,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,351.0,30473.0,3917.0,2566.0,58723.0
5,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,,4759.0,18191.0,3430.0,61869.0
7,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,2789.0,24519.0,24437.0,65823.0,111832.0
9,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,2268.0,7349.0,144961.0,100432.0,76402.0


##### Immigration

In [59]:
immigrationPlusNew.show(5)

+-------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+-------+-------+-------+-------+----------+------+-------+-----------+-----+--------+
|  cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|  dtadfile|entdepa|entdepd|matflag|biryear|   dtaddto|gender|airline|     admnum|fltno|visatype|
+-------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+-------+-------+-------+-------+----------+------+-------+-----------+-----+--------+
|5748517| 2016|     4|   245|   438|    LOS|2016-04-30|      1|     CA|2016-05-08|    40|      1|    1|2016-04-30|      G|      O|      M|   1976|2016-10-29|     F|     QF|94953870030|00011|      B1|
|5748518| 2016|     4|   245|   438|    LOS|2016-04-30|      1|     NV|2016-05-17|    32|      1|    1|2016-04-30|      G|      O|      M|   1984|2016-10-29|     F|     VA|94955622830|00007|      B1|


In [97]:
%%sql
DROP TABLE IF EXISTS immigration;
CREATE TABLE IF NOT EXISTS immigration
               (cicid VARCHAR PRIMARY KEY NOT NULL,
               i94yr INT NOT NULL, 
               i94mon INT NOT NULL, 
               i94cit VARCHAR NOT NULL,
               i94res VARCHAR NOT NULL,
               i94port VARCHAR NOT NULL,
               arrdate DATE NOT NULL,
               i94mode INT NOT NULL,
               i94addr VARCHAR,
               depdate DATE,
               i94bir INT,
               i94visa INT NOT NULL,
               count INT NOT NULL,
               dtadfile DATE NOT NULL,
               entdepa VARCHAR NOT NULL,
               entdepd VARCHAR,
               matflag VARCHAR,
               biryear INT,
               dtaddto DATE,
               gender VARCHAR,
               airline VARCHAR,
               admnum VARCHAR NOT NULL,
               fltno VARCHAR,
               visatype VARCHAR NOT NULL);

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.
Done.


[]

In [61]:
# # read the data 
# immigration=spark.read.parquet("s3a://udacity-capstone-project-bucket-test/immigration.parquet/*.parquet")

In [62]:
# # count
# immigration.count()

2768864

In [63]:
# # top 5
# immigration.show(5)

+-------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+-------+-------+-------+-------+----------+------+-------+-----------+-----+--------+
|  cicid|i94yr|i94mon|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|i94visa|count|  dtadfile|entdepa|entdepd|matflag|biryear|   dtaddto|gender|airline|     admnum|fltno|visatype|
+-------+-----+------+------+------+-------+----------+-------+-------+----------+------+-------+-----+----------+-------+-------+-------+-------+----------+------+-------+-----------+-----+--------+
|5748517| 2016|     4|   245|   438|    LOS|2016-04-30|      1|     CA|2016-05-08|    40|      1|    1|2016-04-30|      G|      O|      M|   1976|2016-10-29|     F|     QF|94953870030|00011|      B1|
|5748518| 2016|     4|   245|   438|    LOS|2016-04-30|      1|     NV|2016-05-17|    32|      1|    1|2016-04-30|      G|      O|      M|   1984|2016-10-29|     F|     VA|94955622830|00007|      B1|


In [64]:
# data schema
immigration.printSchema()

root
 |-- cicid: string (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: date (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [98]:
query = """COPY immigration
FROM 's3://udacity-capstone-project-bucket-test/immigration.parquet/'
credentials 'aws_access_key_id={};aws_secret_access_key={}'
PARQUET;""".format(KEY, SECRET)
%sql $query

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.


[]

In [99]:
%%sql
SELECT * FROM immigration LIMIT 5;

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
5748518,2016,4,245,438,LOS,2016-04-30,1,NV,2016-05-17,32,1,1,2016-04-30,G,O,M,1984,2016-10-29,F,VA,94955622830,7,B1
5748519,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-08,29,1,1,2016-04-30,G,O,M,1987,2016-10-29,M,DL,94956406530,40,B1
5748520,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,29,1,1,2016-04-30,G,O,M,1987,2016-10-29,F,DL,94956451430,40,B1
5748521,2016,4,245,438,LOS,2016-04-30,1,WA,2016-05-14,28,1,1,2016-04-30,G,O,M,1988,2016-10-29,M,DL,94956388130,40,B1
5748522,2016,4,245,464,HHW,2016-04-30,1,HI,2016-05-05,57,2,1,2016-04-30,G,O,M,1959,2016-10-29,M,NZ,94981802830,10,B2


##### Temperature

In [100]:
%%sql
DROP TABLE IF EXISTS temperature;
CREATE TABLE IF NOT EXISTS temperature
               (temp_id INT IDENTITY(0, 1) PRIMARY KEY,
               dt DATE NOT NULL,
               AverageTemperature FLOAT4 NOT NULL,
               AverageTemperatureUncertainty FLOAT4 NOT NULL,
               State VARCHAR NOT NULL
               );

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.
Done.


[]

In [101]:
query = """COPY temperature 
FROM 's3://udacity-capstone-project-bucket-test/temperatureUS.csv'
credentials 'aws_access_key_id={};aws_secret_access_key={}'
CSV;""".format(KEY, SECRET)
%sql $query

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.


[]

In [89]:
# %%sql
# SELECT * FROM stl_load_errors;

In [102]:
%%sql
SELECT * FROM temperature LIMIT 5;

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


temp_id,dt,averagetemperature,averagetemperatureuncertainty,state
0,2003-01-01,4.8,0.1,Alabama
2,2003-02-01,9.1,0.2,Alabama
4,2003-03-01,14.6,0.2,Alabama
6,2003-04-01,17.8,0.2,Alabama
8,2003-05-01,22.6,0.2,Alabama


##### Normalization

dim_airports

In [106]:
%%sql
DROP TABLE IF EXISTS dim_airports;
CREATE TABLE dim_airports AS
    SELECT DISTINCT tb.region,  totalAirport FROM
        (SELECT region, COUNT(ident) AS totalAirport
        FROM airports
        GROUP BY 1) tb
        WHERE region IN (SELECT DISTINCT "State Code" FROM cities)
        AND region IN (SELECT DISTINCT i94addr FROM immigration);

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.
Done.


[]

In [107]:
%%sql
SELECT * FROM dim_airports LIMIT 5

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


region,totalairport
FL,523
MO,411
NY,400
OR,357
SD,162


dim_cities

In [109]:
%%sql
DROP TABLE IF EXISTS dim_cities;
CREATE TABLE dim_cities AS
    SELECT DISTINCT tb.* FROM
        (SELECT "State Code", 
        ROUND(AVG("Median Age"), 1) AS "Median Age",
        SUM("Male Population") AS "Male Population",
        SUM("Female Population") AS "Female Population",
        SUM("Total Population") AS "Total Population",
        SUM("Number of Veterans") AS "Number of Veterans",
        SUM("Foreign-born") AS "Foreign-born",
        ROUND(AVG("Average Household Size"), 1) AS "Average Household Size",
        ROUND(SUM("American Indian and Alaska Native"), 1) AS "American Indian and Alaska Native",
        ROUND(SUM([Asian]), 1) AS [Asian],
        ROUND(SUM("Black or African-American"), 1) AS "Black or African-American",
        ROUND(SUM("Hispanic or Latino"), 1) AS "Hispanic or Latino",
        ROUND(SUM([White]), 1) AS [White] FROM cities
        GROUP BY "State Code") tb
        WHERE "State Code" IN (SELECT DISTINCT region FROM airports)
        AND "State Code" IN (SELECT DISTINCT i94addr FROM immigration);

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.
Done.


[]

In [110]:
%%sql
SELECT * FROM dim_cities LIMIT 5

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


state code,median age,male population,female population,total population,number of veterans,foreign-born,average household size,american indian and alaska native,asian,black or african-american,hispanic or latino,white
FL,39.0,3236773,3487375,6724148,372997,1684897,2.8,43541.0,241587.5,1517568.5,1906897.5,4439764.5
SC,34.2,260944,272713,533657,33463,27744,2.5,3492.0,19852.3,152653.3,29162.7,346412.7
WA,35.3,1245605,1254502,2500107,153126,440962,2.6,60248.0,346995.0,187584.0,324070.0,1913484.0
ID,34.8,199103,199780,398883,26380,28126,2.7,6705.0,13985.0,7822.0,48142.0,370314.0
MT,35.5,87707,93587,181294,13854,5977,2.3,9684.0,4165.0,3349.0,10000.0,169026.0


dim_temperature

There is no funcion like PIVOT in Sqlite. Thus. use the following SQL query to achieve the function.

In [111]:
%%sql
DROP TABLE IF EXISTS dim_temperature;
CREATE TABLE dim_temperature AS
    WITH season AS (
    SELECT 
        CASE 
        WHEN EXTRACT(MONTH FROM dt) IN (03, 04, 05) THEN 'Spring'
        WHEN EXTRACT(MONTH FROM dt) IN (06, 07, 08) THEN 'Summer'
        WHEN EXTRACT(MONTH FROM dt) IN (09, 10, 11) THEN 'Fall'
        WHEN EXTRACT(MONTH FROM dt) IN (12, 01, 02) THEN 'Winter'
        END Season, 
        CASE 
        WHEN State = 'Georgia (State)' THEN 'Georgia'
        WHEN State = 'District Of Columbia' THEN 'District of Columbia'
        ELSE State
        END AS State
        , 
        ROUND(AVG(AverageTemperature), 1) AS AverageTemperature 
        FROM temperature
        GROUP BY 1, 2
    )
    SELECT DISTINCT cities."State Code", FallAvgTemp, SummerAvgTemp, SpringAvgTemp, WinterAvgTemp FROM
        (SELECT State, 
        CASE WHEN Season = 'Fall' THEN AverageTemperature END AS FallAvgTemp FROM season) fall 
        JOIN (
        SELECT State, 
        CASE WHEN Season = 'Summer' THEN AverageTemperature END AS SummerAvgTemp FROM season 
        ) summer
        ON fall.State = Summer.State
        JOIN (
        SELECT State, 
        CASE WHEN Season = 'Spring' THEN AverageTemperature END AS SpringAvgTemp FROM season     
        ) spring
        ON fall.State = spring.State
        JOIN (
        SELECT State, 
        CASE WHEN Season = 'Winter' THEN AverageTemperature END AS WinterAvgTemp FROM season        
        ) winter
        ON fall.State = winter.State
        JOIN cities 
        ON fall.State = cities.State
        WHERE cities."State Code" IN (SELECT DISTINCT region FROM airports)
        AND cities."State Code" IN (SELECT DISTINCT i94addr FROM immigration)
        AND FallAvgTemp IS NOT NULL AND SummerAvgTemp IS NOT NULL AND SpringAvgTemp IS NOT NULL AND WinterAvgTemp IS NOT NULL
;

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.
Done.


[]

In [112]:
%%sql
SELECT * FROM dim_temperature LIMIT 5

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


state code,fallavgtemp,summeravgtemp,springavgtemp,winteravgtemp
PA,11.8,21.5,9.7,-1.7
AR,17.3,26.8,16.6,5.6
RI,12.6,21.1,8.6,-0.7
DC,14.2,24.2,12.4,1.4
LA,20.8,28.1,20.0,10.9


fact_immigration

In [113]:
%%sql
DROP TABLE IF EXISTS fact_immigration;
CREATE TABLE fact_immigration AS
                SELECT * FROM 
                immigration  
                WHERE i94addr IN
                (SELECT DISTINCT "State Code" FROM cities)
                AND i94addr IN (SELECT DISTINCT region FROM airports);

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
Done.
Done.


[]

In [114]:
%%sql
SELECT * FROM fact_immigration LIMIT 5

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
5901585,2016,4,111,111,CHM,2016-04-01,1,NY,,21.0,2,1,2016-05-02,A,,,1995.0,2016-06-30,F,AC,42339986533,,WT
5901589,2016,4,111,111,CHM,2016-04-01,9,NJ,,37.0,2,1,2016-05-02,A,,,1979.0,2016-06-29,M,,42309454033,,WT
5902170,2016,4,112,112,BLA,2016-04-01,9,OR,,,2,1,2016-05-02,A,,,,2016-06-29,M,,86558290728,,WT
5909168,2016,4,373,373,WPB,2016-04-01,9,FL,2016-04-04,33.0,1,1,2016-05-05,A,D,M,1983.0,2016-09-30,X,,49468112027,,B1
5910502,2016,4,464,438,PEV,2016-04-01,1,FL,,33.0,1,1,2016-05-06,A,W,M,1983.0,2016-10-01,M,MM,12797272727,4242.0,B1


<a id='check'></a>

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

##### Check the relationship among the normalized tables

In [115]:
%%sql
SELECT DISTINCT i94addr, dc."State Code", da.region, dt."State Code" FROM fact_immigration f
LEFT JOIN dim_cities dc
ON f.i94addr = dc."State Code"
LEFT JOIN dim_airports da
ON f.i94addr = da.region
LEFT JOIN dim_temperature dt
ON f.i94addr = dt."State Code"
ORDER BY 1

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
48 rows affected.


i94addr,state code,region,state code_1
AK,AK,AK,AK
AL,AL,AL,AL
AR,AR,AR,AR
AZ,AZ,AZ,AZ
CA,CA,CA,CA
CO,CO,CO,CO
CT,CT,CT,CT
DC,DC,DC,DC
DE,DE,DE,DE
FL,FL,FL,FL


In [116]:
%%sql
SELECT COUNT(*), MIN("State Code"), MAX("State Code") FROM dim_cities

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count,min,max
48,AK,WI


In [117]:
%%sql
SELECT COUNT(*), MIN("State Code"), MAX("State Code") FROM dim_temperature

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count,min,max
48,AK,WI


In [118]:
%%sql
SELECT COUNT(DISTINCT region), MIN(region), MAX(region) FROM dim_airports

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count,min,max
48,AK,WI


In [120]:
%%sql
SELECT COUNT(DISTINCT i94addr), MIN(i94addr), MAX(i94addr) FROM fact_immigration

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count,min,max
48,AK,WI


All the state codes in the dimension tables and fact table can be matched. 

##### Check the normalized tables with the original tables

**cities**

In [122]:
%%sql
SELECT * FROM dim_cities
WHERE "State Code" = 'AL'

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


state code,median age,male population,female population,total population,number of veterans,foreign-born,average household size,american indian and alaska native,asian,black or african-american,hispanic or latino,white
AL,36.2,497248,552381,1049629,71543,52154,2.4,8084.0,28769.0,521068.0,39313.0,498920.0


In [123]:
# original cities
cities[cities["State Code"] == "AL"]

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
67,Montgomery,Alabama,35.4,94582,106004,200586,14955,9337,2.41,AL,White,73545
68,Huntsville,Alabama,38.1,91764,97350,189114,16637,12691,2.18,AL,American Indian and Alaska Native,1755
101,Huntsville,Alabama,38.1,91764,97350,189114,16637,12691,2.18,AL,Hispanic or Latino,10887
212,Birmingham,Alabama,35.6,102122,112789,214911,13212,8258,2.21,AL,Asian,1500
375,Huntsville,Alabama,38.1,91764,97350,189114,16637,12691,2.18,AL,Black or African-American,61561
479,Dothan,Alabama,38.9,32172,35364,67536,6334,1699,2.59,AL,Asian,1175
500,Dothan,Alabama,38.9,32172,35364,67536,6334,1699,2.59,AL,Hispanic or Latino,1704
552,Montgomery,Alabama,35.4,94582,106004,200586,14955,9337,2.41,AL,American Indian and Alaska Native,1277
598,Tuscaloosa,Alabama,29.1,47293,51045,98338,3647,4706,2.67,AL,Asian,2733


The information listed in the "dim_cities" is the same as the information listed in the original "cities".

**airports**

In [124]:
%%sql
SELECT * FROM dim_airports 
WHERE region = 'AL'

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


region,totalairport
AL,197


In [163]:
# original airport
(airport[(airport.iso_region == 'US-AL') &
        ~(airport.type == "closed") & 
        airport.type.str.contains("airport") & 
        ~airport.elevation_ft.isna()].
        drop(["continent","iata_code", "gps_code", "local_code", "municipality"], axis=1).drop_duplicates())

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,coordinates
3,00AL,small_airport,Epps Airpark,820,US,US-AL,"-86.77030181884766, 34.86479949951172"
65,01AL,small_airport,Ware Island Airport,344,US,US-AL,"-86.51390075683594, 32.94599914550781"
129,02AL,small_airport,Bass Field,61,US,US-AL,"-87.76439666748047, 30.37150001525879"
605,0AL1,small_airport,Resort Airport,97,US,US-AL,"-87.65689849853516, 30.443899154663086"
609,0AL5,small_airport,Flomaton Airport,247,US,US-AL,"-87.25279998779297, 31.03219985961914"
613,0AL9,small_airport,Wilson Creek Airport,590,US,US-AL,"-87.63249969482422, 34.849998474121094"
791,0J0,small_airport,Abbeville Municipal Airport,468,US,US-AL,"-85.23880004882812, 31.600200653076172"
1500,15A,small_airport,Mark Reynolds/North Mobile County Airport,60,US,US-AL,"-87.98059844970703, 30.91320037841797"
1786,1AL2,small_airport,Tri-L Acres Airport,580,US,US-AL,"-86.59750366210938, 33.248600006103516"
1788,1AL4,small_airport,Elsanor Airport,180,US,US-AL,"-87.55940246582031, 30.544700622558594"


Number of airports in the "dim_airports" can be matched to the number in the original "airport" table.

**temperature**

In [164]:
%%sql
SELECT * FROM dim_temperature
WHERE "State Code" = 'AL'

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


state code,fallavgtemp,summeravgtemp,springavgtemp,winteravgtemp
AL,18.6,26.9,17.9,8.4


In [165]:
# original temperature table
spring = round(temperature[(temperature.dt.str.contains("-03-|-04-|-05-")) & (temperature.State == "Alabama")]["AverageTemperature"].astype(float).mean(), 1)
summer = round(temperature[(temperature.dt.str.contains("-06-|-07-|-08-")) & (temperature.State == "Alabama")]["AverageTemperature"].astype(float).mean(), 1)
autumn = round(temperature[(temperature.dt.str.contains("-09-|-10-|-11-")) & (temperature.State == "Alabama")]["AverageTemperature"].astype(float).mean(), 1)
winter = round(temperature[(temperature.dt.str.contains("-12-|-01-|-02-")) & (temperature.State == "Alabama")]["AverageTemperature"].astype(float).mean(), 1)
print("Fall:{}\tSummer:{}\tSpring:{}\tWinter:{}".format(autumn, summer, spring, winter))

Fall:17.4	Summer:26.0	Spring:17.0	Winter:8.0


The temperature in "dim_temperature" is a little higher than original "temperature" table.

**immigration**

In [167]:
# %%sql
# SELECT * FROM fact_immigration LIMIT 5;

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
5 rows affected.


cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
5901585,2016,4,111,111,CHM,2016-04-01,1,NY,,21.0,2,1,2016-05-02,A,,,1995.0,2016-06-30,F,AC,42339986533,,WT
5901589,2016,4,111,111,CHM,2016-04-01,9,NJ,,37.0,2,1,2016-05-02,A,,,1979.0,2016-06-29,M,,42309454033,,WT
5902170,2016,4,112,112,BLA,2016-04-01,9,OR,,,2,1,2016-05-02,A,,,,2016-06-29,M,,86558290728,,WT
5909168,2016,4,373,373,WPB,2016-04-01,9,FL,2016-04-04,33.0,1,1,2016-05-05,A,D,M,1983.0,2016-09-30,X,,49468112027,,B1
5910502,2016,4,464,438,PEV,2016-04-01,1,FL,,33.0,1,1,2016-05-06,A,W,M,1983.0,2016-10-01,M,MM,12797272727,4242.0,B1


In [174]:
%%sql
SELECT COUNT(*) FROM fact_immigration
WHERE i94addr = 'AL'

 * postgresql://udacity-data-pipelines:***@redshift-cluster-1.cxdkjytbphwr.us-west-2.redshift.amazonaws.com:5439/dev
1 rows affected.


count
8036


In [169]:
# original immigration
immigration[immigration.i94addr == "AL"].count()

8188

The information listed in the "fact_immigration" is the very close to the information in the original "immigration" table.

In conclusion, the information in the original tables is pretty close to the information in the normalized tables.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [392]:
# %%sql
# SELECT * FROM dim_cities LIMIT 5

* **dim_cities**:
    * "State Code" (PK): Code for states.
    * "Median Age": Median Age for population for US 2015 Census.
    * "Male Population": Male population for US 2015 Census.
    * "Female Population": Female Population for US 2015 Census.
    * "Total Population": Total Population for US 2015 Census.
    * "Number of Veterans": Number of Veterans for US 2015 Census.
    * "Foreign-born": Foreign-born population for US 2015 Census.
    * "Average Household Size": Average Household Size for US 2015 Census.
    * "American Indian and Alaska Native": American Indian and Alaska Native population for US 2015 Census.
    * "Asian": Asian population for US 2015 Census.
    * "Hispanic or Latino": Hispanic or Latino population for US 2015 Census.
    * "White": White population for US 2015 Census.

In [393]:
# %%sql
# SELECT * FROM dim_airports LIMIT 5

* **dim_airports**:
    * "region" (PK): Code for states.
    * "totalAirport": Number of airports in the state (including small, medium, and large airports).

In [394]:
# %%sql
# SELECT * FROM dim_temperature LIMIT 5

* **dim_temperature**:
    * "State Code" (PK): Code for states.
    * "FallAvgTemp": Average temperature in Fall (Sep, Oct, Nov) from 2003 onwards.
    * "SummerAvgTemp": Average temperature in Summer (Jun, Jul, Oct) from 2003 onwards.
    * "	SpringAvgTemp": Average temperature in Spring (Mar, Apr, May) from 2003 onwards.
    * "WinterAvgTemp": Average temperature in Winter (Dec, Jan, Feb) from 2003 onwards.

In [179]:
# %%sql
# SELECT * FROM fact_tourists LIMIT 5;

* **fact_tourists**:
    * "cicid" (PK): cic id for tourists.
    * "i94yr": i94 filling year.
    * "i94mon": i94 filling month.
    * "i94cit": country codes.
    * "i94res": country codes.
    * "i94port": port initials.
    * "arrdate": Arrival Date in the USA.
    * "i94mode": 1 = 'Air, '2 = 'Sea', 3 = 'Land', 9 = 'Not reported' ;.
    * "i94addr" (FK): US state codes.
    * "depdate": Departure Date from the USA.
    * "i94bir": Respondant birth year.
    * "i94visa": visa codes, 1 = Business, 2 = Pleasure, 3 = Student.
    * "count": Used for summary statistics.
    * "dtadfile": Date added to I-94 Files.
    * "visapost":  Department of State where where Visa was issued.
    * "entdepa": Arrival Flag - admitted or paroled into the U.S..
    * "entdepd": Departure Flag - Departed, lost I-94 or is deceased.
    * "matflag": Match flag - Match of arrival and departure records.    
    * "biryear": 4 digit year of birth.
    * "dtaddto": Date to which admitted to U.S. (allowed to stay until).
    * "gender": Non-immigrant sex.
    * "airline": Airline used to arrive in U.S..
    * "admnum": Admission Number.
    * "fltno": Flight number of Airline used to arrive in U.S..
    * "visatype": Class of admission legally admitting the non-immigrant to temporarily stay in U.S..       

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    * For this project, we used Python, PySpark, SQL, AWS S3, and AWS Redshift.  
        * Python handles small data well and easily while PySpark deals with big data quite well.   
        * And SQL is a query language which has been used in many DBMSs. And it can also been deployed in Python and PySpark.  
        * We use AWS S3 to keep a copy of all the staging files and copy these staging files directly from S3 to Redshift.
* Propose how often the data should be updated and why.
    * The data should be updated monthly since the immigration data is collected monthly while the other data sources are refreshed less frequently.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     * Use PySpark to build ETL processes and save all the data as parquet files in S3.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     * Use Airflow to schedule the pipelines which connect all the data sources, data warehouse, and the the dashboard.
 * The database needed to be accessed by 100+ people.
     * Create IAM User groups for the different users.