## US IMMIGRATION PATTERNS WITH RELATION TO TEMPERATURE AND CITY DEMOGRAPHICS

#### PROJECT SUMMARY

The goal of the project is to provide an organized database to aid in the studies of the US immigrations partterns.

#### Project Scope

This project will combine data from various sources which are needed for the analysis.
The data is processed using spark and are saved as parquet files on Amazon S3
which can later used in a form of schema on read.

#### Data Used for this Project

- **U.S. City Demographic Data**:  comes from OpenSoft and includes data by city, state, age, population, veteran status and race [link](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
- **I94 Immigration Data**: comes from the US National Tourism and Trade Office and includes details on incoming immigrants and their ports ofentry [link](https://travel.trade.gov/research/reports/i94/historical/2016.html)
- **World Temperature Data**: from Kaggle [link](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
- **Airports**: from Github [link](https://raw.githubusercontent.com/L1fescape/airport-codes/master/airports.json)

### Data Access and Exploration

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

#### Explore Immigration data

In [4]:
us_immigration = spark.read.parquet("sas_data")

In [5]:
us_immigration.head()

Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')

In [6]:
us_immigration.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
us_immigration.show(10)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     NV|20591.0|  32.0|    1.0|  

#### Selecting relevant columns

In [8]:
us_immigration = us_immigration.select(['i94yr', 'i94mon', 'i94cit', 'i94port', 'i94mode', 'i94bir', 'arrdate', 'depdate', 'i94visa','gender'])
us_immigration.show(100)

+------+------+------+-------+-------+------+-------+-------+-------+------+
| i94yr|i94mon|i94cit|i94port|i94mode|i94bir|arrdate|depdate|i94visa|gender|
+------+------+------+-------+-------+------+-------+-------+-------+------+
|2016.0|   4.0| 245.0|    LOS|    1.0|  40.0|20574.0|20582.0|    1.0|     F|
|2016.0|   4.0| 245.0|    LOS|    1.0|  32.0|20574.0|20591.0|    1.0|     F|
|2016.0|   4.0| 245.0|    LOS|    1.0|  29.0|20574.0|20582.0|    1.0|     M|
|2016.0|   4.0| 245.0|    LOS|    1.0|  29.0|20574.0|20588.0|    1.0|     F|
|2016.0|   4.0| 245.0|    LOS|    1.0|  28.0|20574.0|20588.0|    1.0|     M|
|2016.0|   4.0| 245.0|    HHW|    1.0|  57.0|20574.0|20579.0|    2.0|     M|
|2016.0|   4.0| 245.0|    HHW|    1.0|  66.0|20574.0|20586.0|    2.0|     F|
|2016.0|   4.0| 245.0|    HHW|    1.0|  41.0|20574.0|20586.0|    2.0|     F|
|2016.0|   4.0| 245.0|    HOU|    1.0|  27.0|20574.0|20581.0|    2.0|     M|
|2016.0|   4.0| 245.0|    LOS|    1.0|  26.0|20574.0|20581.0|    2.0|     F|

### Checking if all ports are valid

In [9]:
us_immigration.select('i94port').dropDuplicates().show()

+-------+
|i94port|
+-------+
|    FMY|
|    BGM|
|    HEL|
|    DNS|
|    MOR|
|    FOK|
|    HVR|
|    SNA|
|    PTK|
|    CLG|
|    SPM|
|    OPF|
|    DLB|
|    ABS|
|    NAS|
|    MYR|
|    PVD|
|    OAK|
|    FAR|
|    OTT|
+-------+
only showing top 20 rows



### Some records have null departure date, they can be filtered out (This is subjective it depends on the analysis perfomed with the data)

In [10]:
us_immigration_clean = us_immigration.where(us_immigration['depdate'].isNotNull())
us_immigration_clean.show(50)

+------+------+------+-------+-------+------+-------+-------+-------+------+
| i94yr|i94mon|i94cit|i94port|i94mode|i94bir|arrdate|depdate|i94visa|gender|
+------+------+------+-------+-------+------+-------+-------+-------+------+
|2016.0|   4.0| 245.0|    LOS|    1.0|  40.0|20574.0|20582.0|    1.0|     F|
|2016.0|   4.0| 245.0|    LOS|    1.0|  32.0|20574.0|20591.0|    1.0|     F|
|2016.0|   4.0| 245.0|    LOS|    1.0|  29.0|20574.0|20582.0|    1.0|     M|
|2016.0|   4.0| 245.0|    LOS|    1.0|  29.0|20574.0|20588.0|    1.0|     F|
|2016.0|   4.0| 245.0|    LOS|    1.0|  28.0|20574.0|20588.0|    1.0|     M|
|2016.0|   4.0| 245.0|    HHW|    1.0|  57.0|20574.0|20579.0|    2.0|     M|
|2016.0|   4.0| 245.0|    HHW|    1.0|  66.0|20574.0|20586.0|    2.0|     F|
|2016.0|   4.0| 245.0|    HHW|    1.0|  41.0|20574.0|20586.0|    2.0|     F|
|2016.0|   4.0| 245.0|    HOU|    1.0|  27.0|20574.0|20581.0|    2.0|     M|
|2016.0|   4.0| 245.0|    LOS|    1.0|  26.0|20574.0|20581.0|    2.0|     F|

#### Exploring Temperature Data 

In [11]:
temp_data = spark.read.format("csv").option("header", "true").load('GlobalLandTemperaturesByCity.csv')

In [12]:
temp_data.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [13]:
temp_data.show(100)

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 14.050999999999998|        

#### Droping rows with null AverageTemperature and also drop duplicate Cities

In [14]:
temp_data_clean = temp_data.where(temp_data['AverageTemperature'].isNotNull()).where(temp_data['Country'] == 'United States').dropDuplicates(['City'])
temp_data_clean.show(100)

+----------+-------------------+-----------------------------+----------------+-------------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty|            City|      Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+----------------+-------------+--------+---------+
|1743-11-01|             13.918|                        2.234|      Charleston|United States|  32.95N|   79.47W|
|1849-01-01|             13.116|           2.5860000000000003|          Corona|United States|  32.95N|  117.77W|
|1758-03-01|             19.761|                        3.464|       Hollywood|United States|  26.52N|   80.60W|
|1758-03-01|             19.761|                        3.464|   Coral Springs|United States|  26.52N|   80.60W|
|1828-01-01|            -17.738|                        3.468|       Anchorage|United States|  61.88N|  151.13W|
|1743-11-01|              3.264|                        1.665|       Allentown|United States|  4

### Exploring Demographics

In [15]:
demograph_data = spark.read.format("csv").option("header", "true").load('us-cities-demographics.csv',sep=';')

In [16]:
demograph_data.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [17]:
demograph_data.show(10)

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|       Alabama|      38.5|          3

### Check if there are duplicate cities

In [18]:
demograph_data.orderBy('City').show(20)

+-------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   City|     State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+-------+----------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|Abilene|     Texas|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|American Indian a...|  1813|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|              9367|        8129|                  2.64|        TX|  Hispanic or Latino| 33222|
|Abilene|     Texas|      31.3|          65212|            60664|          125876|              9367|        8129| 

### Remove Duplicate cities

In [19]:
demograph_data_clean = demograph_data.dropDuplicates(['City'])
demograph_data_clean.select('City','State','State Code').orderBy('City').show()

+-----------------+------------+----------+
|             City|       State|State Code|
+-----------------+------------+----------+
|          Abilene|       Texas|        TX|
|            Akron|        Ohio|        OH|
|          Alafaya|     Florida|        FL|
|          Alameda|  California|        CA|
|           Albany|     Georgia|        GA|
|      Albuquerque|  New Mexico|        NM|
|       Alexandria|    Virginia|        VA|
|         Alhambra|  California|        CA|
|            Allen|Pennsylvania|        PA|
|         Amarillo|       Texas|        TX|
|             Ames|        Iowa|        IA|
|          Anaheim|  California|        CA|
|        Anchorage|      Alaska|        AK|
|        Ann Arbor|    Michigan|        MI|
|          Antioch|  California|        CA|
|     Apple Valley|  California|        CA|
|         Appleton|   Wisconsin|        WI|
|     Arden-Arcade|  California|        CA|
|        Arlington|       Texas|        TX|
|Arlington Heights|    Illinois|

### Exploring Airport Codes

In [20]:
# source https://raw.githubusercontent.com/L1fescape/airport-codes/master/airports.json
airports = spark.read.json('airports.json')

In [21]:
airports.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- altitude: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- name: string (nullable = true)
 |-- timezone: string (nullable = true)
 |-- tz: string (nullable = true)



In [22]:
airports = airports.filter(airports['city'].isNotNull())
airports.show()

+---------------+--------+--------------+----------------+---+----+----+---+---------+----------+--------------------+--------+--------------------+
|_corrupt_record|altitude|          city|         country|dst|iata|icao| id| latitude| longitude|                name|timezone|                  tz|
+---------------+--------+--------------+----------------+---+----+----+---+---------+----------+--------------------+--------+--------------------+
|           null|    5282|        Goroka|Papua New Guinea|  U| GKA|AYGA|  1|-6.081689|145.391881|              Goroka|      10|Pacific/Port_Moresby|
|           null|      20|        Madang|Papua New Guinea|  U| MAG|AYMD|  2|-5.207083|  145.7887|              Madang|      10|Pacific/Port_Moresby|
|           null|    5388|   Mount Hagen|Papua New Guinea|  U| HGU|AYMH|  3|-5.826789|144.295861|         Mount Hagen|      10|Pacific/Port_Moresby|
|           null|     239|        Nadzab|Papua New Guinea|  U| LAE|AYNZ|  4|-6.569828|146.726242|         

### Get only US airports

In [23]:
airports.where(airports['country'] == 'United States').show()

+---------------+--------+-------------+-------------+---+----+----+----+----------+------------+--------------------+--------+-------------------+
|_corrupt_record|altitude|         city|      country|dst|iata|icao|  id|  latitude|   longitude|                name|timezone|                 tz|
+---------------+--------+-------------+-------------+---+----+----+----+----------+------------+--------------------+--------+-------------------+
|           null|     842|  Greencastle|United States|  U| 4I7|  \N|6891|39.6335556| -86.8138056|Putnam County Air...|      -5|   America/New_York|
|           null|     748|     Dowagiac|United States|  U| C91|  \N|6890|41.9929342| -86.1280125|Dowagiac Municipa...|      -5|   America/New_York|
|           null|     799|    Cambridge|United States|  U| CDI|  \N|6889|39.9750278| -81.5775833|Cambridge Municip...|      -5|   America/New_York|
|           null|     725| Sturgeon Bay|United States|  U| SUE|  \N|6885|44.8436667| -87.4215556|Door County Che

In [24]:
### some airports do not have iata, They are probably small airports for local flights 
airports.where(airports['country'] == 'United States').where(airports['iata'] == '').count()

238

In [25]:
airports = airports.where(airports['country'] == 'United States').where(airports['iata'] != '')
airports.show()

+---------------+--------+-------------+-------------+---+----+----+----+----------+------------+--------------------+--------+-------------------+
|_corrupt_record|altitude|         city|      country|dst|iata|icao|  id|  latitude|   longitude|                name|timezone|                 tz|
+---------------+--------+-------------+-------------+---+----+----+----+----------+------------+--------------------+--------+-------------------+
|           null|     842|  Greencastle|United States|  U| 4I7|  \N|6891|39.6335556| -86.8138056|Putnam County Air...|      -5|   America/New_York|
|           null|     748|     Dowagiac|United States|  U| C91|  \N|6890|41.9929342| -86.1280125|Dowagiac Municipa...|      -5|   America/New_York|
|           null|     799|    Cambridge|United States|  U| CDI|  \N|6889|39.9750278| -81.5775833|Cambridge Municip...|      -5|   America/New_York|
|           null|     725| Sturgeon Bay|United States|  U| SUE|  \N|6885|44.8436667| -87.4215556|Door County Che

In [26]:
airports_cleaned = airports.select('city','iata','name').orderBy('iata')
airports_cleaned.show()

+-------------+----+--------------------+
|         city|iata|                name|
+-------------+----+--------------------+
|   Youngstown| 04G|   Lansdowne Airport|
|     Tuskegee| 06A|Moton Field Munic...|
|   Schaumburg| 06C| Schaumburg Regional|
|   Middletown| 06N|     Randall Airport|
|Jekyll Island| 09J|Jekyll Island Air...|
| Elizabethton| 0A9|Elizabethton Muni...|
|        Bryan| 0G6|Williams County A...|
| Seneca Falls| 0G7|Finger Lakes Regi...|
| Stewartstown| 0P2|Shoestring Aviati...|
|Port Townsend| 0S9|Jefferson County ...|
|  Churchville| 0W3|Harford County Ai...|
|    Greenwood| 10C|  Galt Field Airport|
|      Bucyrus| 17G|Port Bucyrus-Craw...|
|    Jefferson| 19A|Jackson County Ai...|
|   Copperhead| 1A3|Martin Campbell F...|
|    Mansfield| 1B9| Mansfield Municipal|
|    Hollister| 1C9|Frazier Lake Airpark|
|  Bolingbrook| 1CS|Clow Internationa...|
|         Kent| 1G3|  Kent State Airport|
|Peach Springs| 1G4|Grand Canyon West...|
+-------------+----+--------------

In [27]:
us_port_codes = us_immigration_clean.dropDuplicates(['i94port'])
us_port_codes.join(airports_cleaned,us_port_codes['i94port'] == airports_cleaned['iata'] ).show(200)

+------+------+------+-------+-------+------+-------+-------+-------+------+--------------------+----+--------------------+
| i94yr|i94mon|i94cit|i94port|i94mode|i94bir|arrdate|depdate|i94visa|gender|                city|iata|                name|
+------+------+------+-------+-------+------+-------+-------+-------+------+--------------------+----+--------------------+
|2016.0|   4.0| 582.0|    BGM|    1.0|  56.0|20559.0|20599.0|    1.0|     M|          Binghamton| BGM|Greater Binghamto...|
|2016.0|   4.0| 582.0|    FMY|    1.0|  43.0|20559.0|20643.0|    2.0|     F|          Fort Myers| FMY|            Page Fld|
|2016.0|   4.0| 582.0|    FOK|    1.0|  37.0|20574.0|20576.0|    2.0|     M|  West Hampton Beach| FOK|  Francis S Gabreski|
|2016.0|   4.0| 148.0|    HVR|    3.0|  68.0|20565.0|20576.0|    2.0|     F|               Havre| HVR|       Havre City Co|
|2016.0|   4.0| 135.0|    PTK|    1.0|  66.0|20549.0|20551.0|    1.0|     M|             Pontiac| PTK|    Oakland Co. Intl|
|2016.0|

### Conceptual Data Model

![image](star_schema.png)

The data was modeled using the star schema with:

**Fact Table**:  
 - fact_immigration
 
**Dimenstion Tables**:  
 - dim_demographics 
 - dim_city  
 - dim_temperature  
 - dim_visa  

### Data Pipline Steps

##### Create dim_temperature Table 
1. Read data from `GlobalLandTemperaturesByCity.csv` file  
2. Drop doplicates by city  
3. Drop rows with  null average temperature  
4. select relevant columns according to data model  
5. save to parquet files  


##### Create  dim_demographics Table 
1. Read data from `us-cities-demographics.csv` file  
2. Drop doplicates by city   
3. select relevant columns according to data model  
4. save to parquet files  


##### Create  dim_city Table
This is created from the resulting data frame from which dim_demographics is created
1. select relevant columns according to data model  
2. save to parquet files  


##### Create  fact_immigration Table
1. Read data from `sas_data` file
2. join data with data of airports to get the city as a column using iata
3. select relevant columns according to data model
4. rename columns according to data model
5. save to parquet files  

##### Create  dim_visa Table
This is created from the resulting data frame from which dim_demographics is created
1. select relevant columns according to data model  
2. group according to code and type
3. drop duplicates
4. select relevant columns according to data model
5. save to parquet files

### Pipelines to Model the Data 

In [28]:
from pyspark.sql.functions import col
import configparser

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

output_path = "s3a://karikari-udacity/capstone-outputs/"




In [37]:
# create temperature step

temp_data = spark.read.format("csv").option("header", "true").load('GlobalLandTemperaturesByCity.csv')
temp_data_clean=temp_data.where(temp_data['AverageTemperature'].isNotNull()) \
         .where(temp_data['Country'] == 'United States').dropDuplicates(['City']) \
.select(
    col('City').alias('city'),
    col('AverageTemperature').alias('average_temperature'),
    col('Country').alias('country'),
    col('Latitude').alias('latitude'),
    col('Longitude').alias('longitude'),
)

temp_data_clean.show(3)
temp_data_clean.printSchema()
temp_data_clean.write.parquet(output_path+'dim_temperature.parquet','overwrite')

+----------+-------------------+-------------+--------+---------+
|      city|average_temperature|      country|latitude|longitude|
+----------+-------------------+-------------+--------+---------+
|Charleston|             13.918|United States|  32.95N|   79.47W|
|    Corona|             13.116|United States|  32.95N|  117.77W|
| Hollywood|             19.761|United States|  26.52N|   80.60W|
+----------+-------------------+-------------+--------+---------+
only showing top 3 rows

root
 |-- city: string (nullable = true)
 |-- average_temperature: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [44]:
# create dim_demographics step

demograph_data = spark.read.format("csv").option("header", "true").load('us-cities-demographics.csv',sep=';')

demograph_data_clean = demograph_data.dropDuplicates(['City'])
demograph_data_table= demograph_data_clean.select(
    col('City').alias('city'),
    col('State').alias('state'),
    col('Median Age').alias('median'),
    col('Male Population').alias('males'),
    col('Female Population').alias('females'),
    col('Total Population').alias('total_population'),
    col('Number of Veterans').alias('num_of_veterans'),
    col('Foreign-born').alias('foreign_born'),
    col('Average Household Size').alias('avg_house_size'))
demograph_data_table.show(3)
demograph_data_table.printSchema()
demograph_data_table.write.parquet(output_path+'dim_demographics.parquet','overwrite')

+------------+-------------+------+-----+-------+----------------+---------------+------------+--------------+
|        city|        state|median|males|females|total_population|num_of_veterans|foreign_born|avg_house_size|
+------------+-------------+------+-----+-------+----------------+---------------+------------+--------------+
|Saint George|         Utah|  37.3|38732|  41475|           80207|           4443|        4824|          2.81|
|       Tyler|        Texas|  33.9|50422|  53283|          103705|           4813|        8225|          2.59|
|   Worcester|Massachusetts|  34.9|90951|  93855|          184806|           9408|       36907|          2.43|
+------------+-------------+------+-----+-------+----------------+---------------+------------+--------------+
only showing top 3 rows

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median: string (nullable = true)
 |-- males: string (nullable = true)
 |-- females: string (nullable = true)
 |-- t

In [45]:
# create dim_city

city = demograph_data_clean.select(
    'city',
    'state',
    col('State Code').alias('state_code'))
city.show(3)
city.printSchema()
city.write.parquet(output_path+'dim_city.parquet','overwrite')

+------------+-------------+----------+
|        city|        state|state_code|
+------------+-------------+----------+
|Saint George|         Utah|        UT|
|       Tyler|        Texas|        TX|
|   Worcester|Massachusetts|        MA|
+------------+-------------+----------+
only showing top 3 rows

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)



In [46]:
# Create fact_immigration
us_immigration = spark.read.parquet("sas_data")
us_immigration_clean = us_immigration.where(us_immigration['depdate'].isNotNull())

# create airport data for joining
airports = spark.read.json('airports.json')
airports_cleaned= airports.select('city','iata','name').orderBy('iata')\
.filter(airports['city'].isNotNull())\
.where(airports['country'] == 'United States')\
.where(airports['iata'] != '')

us_immigration_join = us_immigration_clean.join(airports_cleaned,us_immigration_clean['i94port'] == airports_cleaned['iata'] )
us_immigration_table = us_immigration_join.select(
    col('i94yr').alias('year'),
    col('i94port').alias('port_iata'),
    'city',
    col('arrdate').alias('arrival_date'),
    col('depdate').alias('departure_date'),
    col('i94bir').alias('age'),
    col('i94visa').alias('visa_code'),
    'gender'
)
us_immigration_table.show(3)
us_immigration_table.printSchema()
us_immigration_table.write.parquet(output_path+'dim_immigration.parquet','overwrite')

+------+---------+-----------+------------+--------------+----+---------+------+
|  year|port_iata|       city|arrival_date|departure_date| age|visa_code|gender|
+------+---------+-----------+------------+--------------+----+---------+------+
|2016.0|      HOU|    Houston|     20574.0|       20581.0|27.0|      2.0|     M|
|2016.0|      NEW|New Orleans|     20574.0|       20576.0|44.0|      2.0|     M|
|2016.0|      WAS| Washington|     20574.0|       20596.0|38.0|      2.0|     M|
+------+---------+-----------+------------+--------------+----+---------+------+
only showing top 3 rows

root
 |-- year: double (nullable = true)
 |-- port_iata: string (nullable = true)
 |-- city: string (nullable = true)
 |-- arrival_date: double (nullable = true)
 |-- departure_date: double (nullable = true)
 |-- age: double (nullable = true)
 |-- visa_code: double (nullable = true)
 |-- gender: string (nullable = true)



In [50]:
# Create dim_visa

visa_data = us_immigration_join.select(
    col('i94visa').alias('visa_code'),
    col('visatype').alias('type')).dropDuplicates()
visa_data.show(3)
visa_data.printSchema()
visa_data.write.parquet(output_path+'dim_immigration.parquet','overwrite')

+---------+----+
|visa_code|type|
+---------+----+
|      1.0|  B1|
|      2.0|  WT|
|      1.0|  E2|
+---------+----+
only showing top 3 rows

root
 |-- visa_code: double (nullable = true)
 |-- type: string (nullable = true)



### Data Quality Checks

After runing the pipeline two data quality checks are perfomed.
- Check if data are saved to s3
- check if data table contains data

In [None]:
data_sources = ['fact_immigration.parquet','dim_temperature.parquet','dim_city.paruet','dim_visa.parquet','dim_demographics.parquet']

for source in data_sources:
    try:
        data = spark.read.parquet(output_path+source)
    except:
        raise ValueError(f"Data quality check failed. {source} is not created on s3")
        
    data.count() < 1:
        raise ValueError(f"Data quality check failed. {source} returned no results")

### Data Dictionary
dim_temperature
- **city**: name of city
- **average_temperature**: average temperature of the city
- **country**: country where the city belongs
- **latitude**: latitdue of the city
- **longitude**: longitude of the city

dim_demographics
- **city**: name of city
- **state**: name of city state
- **median**: median age
- **males**: male population
- **females**:  females population
- **total_population**:  total population
- **num_of_veterans**:  number of veterans
- **foreign_born**: foriegn born
- **avg_house_size**: Average family house size

dim_city
- **city**: name of city
- **state**: name of city state
- **state_code**: code of the state

fact_immigration 
- **year**: year of immigration
- **port_iata**: iata port code
- **city**: name of city
- **arrival_date**: arrival date of respondant
- **departure_date**: departure date of respondant
- **age: age of respondant
- **visa_code**: Visa code to represent visa type Business/Pleasure/Student
- **gender**: 

dim_visa
- **visa_code**: visa code to represent visa type Business/Pleasure/Student
- **type**: visa type 

# Conclusion

The project was performed with spark as the processing tool and s3 as storage
spark was chosen because of it's ability to processing huge amount of data very fast
and s3 was chosen because of its durable data storage and the chance to use other amazon tools on the 
data if needed.

The star schema used to model the data since it's makes it easy to query the data 
and also makes thinking about the data much easier for analysts/data scientists.

The pipeline will be run as a batch job, this means it has to be requested by the analyst or data scientist,
they can suggest a periodic update if needed.

In case the data is increased by 100x the ETL pipeline script can be deployed on Amazon EMR with spark installed,
The spark workers will be configured for autoscaling if more computing power is needed.

If the data needs to be updated periodically like 7 am every morning,
a cron job can be setup to run the pipeline or if more control is needed, an automation tool like airflow can be used.

If the data needs to be read by 100 people, it will depend on the BI/dashboard tools using the data as a source.
The data is stored on s3 so any number of dashboard applications can read from it.