# US Immigration and City Analytics

#### Project Summary
This project is aimed to analyize the correlation between immigrants and city's satistics in the US.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [2]:
# create spark session
spark = SparkSession \
    .builder \
    .appName("Capstone project") \
    .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
The questions I have are as following:  
1. Which countries do many immigrants come from by type of visa?
2. What is the city and its median age that each immigrants group -in the age under 20, 21-25, 26-40, 41-60, and above 61 - prefer to live in?
3. What is the most preferred state for immigrants by their nation and what is the racial ratio in the state?
4. What is foreign-born rate of immigrants' top 10 cities?  

To solve above questions, I will gather data, extract what are needed, transform them, and load to parquet file as an analytic table.  
Therefore the data I want to extract in the end includes:  
1. immigrants' departure, destination, age, and visa type
2. each city's racial, foreign-born, total population, and median age
3. country code and its name
As the size of immigrants data is too large, I use `Apache Spark` to process data with parallel computing.

#### Describe and Gather Data 
1. **Immigration data** : `../../data/18-83510-I94-Data-2016/i94_jun16_sub.sas7bdat`  
I94 Immigration Data: This data comes from the US National Tourism and Trade Office.  
![immigration](img/immigration.png)  
2. **US cities statistic data** : `immigration_data_sample.csv`  
This data is for cities population, age, race etc... coming from OpenSoft.  
![city](img/city.png)  
3. **Country code data** : `country_code.txt`  
Country code is necessary to understand where immigrants came from. This is extracted from immigration data description.  
![country](img/country.PNG)

In [3]:
# read immigration data
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_imgr = pd.read_sas(fname, encoding="ISO-8859-1")

In [94]:
# read cities data
df_cities = spark.read.csv("us-cities-demographics.csv", sep=';', header=True, inferSchema=True)

In [5]:
# read country code data
f = open("country_code.txt", "r")
lines = f.readlines()

### Step 2: Explore and Assess the Data
#### Explore the Data 
There are some issues with data.
1. For immigration data, there exist NA/NaN values.

In [6]:
# select columns to use
df_imgr = df_imgr[['cicid','i94yr','i94mon','i94cit',
                   'i94res','i94port','i94addr',
                   'biryear','gender','visatype']]

In [7]:
# fill NA/NaN values with 0
df_imgr = df_imgr.fillna(0)

In [9]:
# reset type of each column
df_imgr = df_imgr.astype({'cicid':'int32',
           'i94yr':'int32',
           'i94mon':'int32',
           'i94cit':'int32',
           'i94res':'int32',
           'i94port':'object',
           'i94addr':'object',
           'biryear':'int32',
           'gender':'object',
           'visatype':'object'
})

# set schema for spark dataframe
imgr_schema = StructType([
    StructField('id', IntegerType()),
    StructField('year', IntegerType()),
    StructField('month', IntegerType()),
    StructField('cit', IntegerType()),
    StructField('res', IntegerType()),
    StructField('port', StringType()),
    StructField('addr', StringType()),
    StructField('birth', IntegerType()),
    StructField('gender', StringType()),
    StructField('visa', StringType())
])

# create spark dataframe
df_imgr = spark.createDataFrame(df_imgr, schema=imgr_schema)

2. For 5 datasets from us-cities-demographics, some rows are duplicated.

In [96]:
# select columns for age table
df_age = df_cities\
    .select("City", 
            "State",
            col("State Code").alias("State_Code"),
            col("Median Age").alias("Median_age"))

# select columns for foreign table
df_foreign = df_cities\
    .select("City", 
            "State",
            col("State Code").alias("State_Code"),
            col("Foreign-born").alias("Foreign_Born"))

# select columns for population table
df_population = df_cities\
    .select("City", 
            "State",
            col("State Code").alias("State_Code"),
            col("Male Population").alias("Male"), 
            col("Female population").alias("Female"), 
            col("total population").alias("total"))

# select columns for race table
df_race = df_cities\
    .select("City", 
            "State",
            col("State Code").alias("State_Code"),
            "Race", 
            "Count")

In [98]:
def check_duplicate():
    """
    Checks if there exists any duplicate data in tables.
    """
    
    result = {}
    # all city should have unique median_age
    result["age"] = df_age.select("City", "State")\
        .groupby("City", "State")\
        .count()\
        .filter(col("count") > 1)\
        .count()
    
    # all city and state should have unique foreign_born
    result["foreign"] = df_foreign.select("City", "State")\
        .groupby("City", "State")\
        .count()\
        .filter(col("count") > 1)\
        .count()
    
    # all city and state should have unique population
    result["population"] = df_population.select("City", "State")\
        .groupby("City", "State")\
        .count()\
        .filter(col("count") > 1)\
        .count()
    
    # all city, state and race should have unique count
    result["race"] = df_race.select("City", "State", "Race")\
        .groupby("City", "State", "Race")\
        .count()\
        .filter(col("count") > 1)\
        .count()
    
    # print results
    for item, value in result.items():
        print(f"{item}: {value}")

# execute function
check_duplicate()

age: 594
foreign: 594
population: 594
race: 0


In [99]:
# drop duplicated records in three tables
df_age = df_age.dropDuplicates()
df_foreign = df_foreign.dropDuplicates()
df_population = df_population.dropDuplicates()

In [100]:
# check again if duplicated records dosn't exist anymore
check_duplicate()

age: 0
foreign: 0
population: 0
race: 0


3. For country data, lines must be splited two columns.

In [27]:
# delete whitespaces and sperating chracter, and add each column name
df_country = pd.DataFrame((line.strip().split('=') for line in lines), columns=["code", "name"])
df_country.head()

Unnamed: 0,code,name
0,582,"'MEXICO Air Sea, and Not Reported (I-94, no ..."
1,236,'AFGHANISTAN'
2,101,'ALBANIA'
3,316,'ALGERIA'
4,102,'ANDORRA'


4. There are still whitespaces and single quote to eliminate.

In [31]:
# strip whitespaces and singe quote
df_country[["name"]] = df_country.name.map(lambda x: x.strip().strip("'"))
df_country.head()

Unnamed: 0,code,name
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


5. And first column should be converted integer type.

In [28]:
df_country.dtypes

code    object
name    object
dtype: object

In [29]:
# change data type of code to integer for later join
df_country = df_country.astype({"code": "int32"})
df_country.dtypes

code     int32
name    object
dtype: object

In [32]:
# convert dataframe to spark one
df_country = spark.createDataFrame(df_country)

#### Staging Steps
Write data to staging table as a parquet format.

In [101]:
# immigrant data is large, so paritition them 
df_imgr.write.parquet("immigration", mode="overwrite", partitionBy=["cit"])
df_age.write.parquet("age", mode="overwrite")
df_population.write.parquet("population", mode="overwrite")
df_foreign.write.parquet("foreign", mode="overwrite")
df_race.write.parquet("race", mode="overwrite")
df_country.write.parquet("country", mode="overwrite")

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Data in staging tables are extracted from original data. Each tables are seperated by columns' relations but can be joined whenever it's needed.  
In this project, they are aggrerated so to create analytic tables according to objective questions.  

**Staging Tables**  
![ERD](img/staging_tables.PNG)  
**Analytic Tables**  
![ERD](img/analytic.png) 

#### 3.2 Mapping Out Data Pipelines  
1. Read original data, Transform and load data into staging tables  
![pipeline1](img/pipeline1.png)  
  
2. Aggregate staging tables and load data into analytic tables  

  **country_from_by_visa**

  ![pipeline_q1](img/pipeline_q1.png)

  **age_by_state_and_immigrants_age**  

  ![pipeline_q2](img/pipeline_q2.png)  

  **race_by_state**  

  ![pipeline_q3](img/pipeline_q3.png)  

  **foreign_born_by_state**

  ![pipeline_q4](img/pipeline_q4.png)  

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [102]:
# create template view for sql query
df_imgr.createOrReplaceTempView("imgr")
df_age.createOrReplaceTempView("age")
df_population.createOrReplaceTempView("population")
df_foreign.createOrReplaceTempView("foreign")
df_race.createOrReplaceTempView("race")
df_country.createOrReplaceTempView("country")

In [82]:
# count immigrations by visa type and their country
df_q1 = spark.sql("""
    SELECT visa, name country, count(*) count
    FROM imgr
    JOIN country
    ON res=code
    GROUP BY visa, name
    UNION
    SELECT visa, '-total-', count(*) count
    FROM imgr
    JOIN country
    ON res=code
    GROUP BY visa
    ORDER BY visa, count DESC
""")
df_q1.show(5)

+----+--------------------+------+
|visa|             country| count|
+----+--------------------+------+
|  B1|             -total-|212410|
|  B1|          CHINA, PRC| 32284|
|  B1|MEXICO Air Sea, a...| 31031|
|  B1|               INDIA| 22485|
|  B1|              BRAZIL| 14044|
+----+--------------------+------+
only showing top 5 rows



In [103]:
# count immigrations by age groups and the state, showing the average of median age in the state
df_q2 = spark.sql("""
    SELECT 
        case when year-birth < 20 then 'under 20'
        when year-birth >=20 and year-birth <=25 then '20-25'
        when year-birth >=26 and year-birth <=40 then '26-40'
        when year-birth >=41 and year-birth <=60 then '41-60'
        when year-birth > 60 then 'above 60' end as Group,
        State, count(*) count, age.Median_age
    FROM imgr
    JOIN (SELECT State, State_Code, cast(avg(Median_age) as int) Median_age
            FROM age
            GROUP BY State, State_Code) age
    ON addr=state_code
    GROUP BY Group, State, Median_age
    ORDER BY Group, count DESC
""")
df_q2.show(5)

+-----+----------+-----+----------+
|Group|     State|count|Median_age|
+-----+----------+-----+----------+
|20-25|  New York|41284|        35|
|20-25|   Florida|33831|        39|
|20-25|California|33033|        36|
|20-25|    Hawaii| 8225|        41|
|20-25|     Texas| 6191|        33|
+-----+----------+-----+----------+
only showing top 5 rows



In [107]:
# count immigrants by country came from and the state visited, showing race rate in the state
df_q3 = spark.sql("""
    SELECT country.name as Country_from, t2.State as State_to, count(*) count, t2.Race, t2.Count as rate
    FROM imgr
    JOIN country
    ON res=code
    JOIN (SELECT race.State, race.State_Code, race.Race, ROUND(100*sum(Count)/t1.total, 2) Count
        FROM race
        JOIN (SELECT race.State, sum(Count) total
            FROM race
            GROUP BY race.State) t1
        ON race.State=t1.State
        GROUP BY race.State, race.State_Code, race.Race, t1.total) t2
    ON imgr.addr=t2.State_Code
    GROUP BY Country_from, State_to, t2.Race, rate
    ORDER BY Country_from, count DESC, rate DESC
""")
df_q3.show(5)

+------------+------------+-----+-----+-----+
|Country_from|    State_to|count| Race| rate|
+------------+------------+-----+-----+-----+
| AFGHANISTAN|North Dakota|    1|White|86.82|
| AFGHANISTAN|      Oregon|    1|White|74.21|
| AFGHANISTAN|      Kansas|    1|White|69.88|
| AFGHANISTAN|    Colorado|    3|White|68.69|
| AFGHANISTAN|   Minnesota|    1|White|67.89|
+------------+------------+-----+-----+-----+
only showing top 5 rows



In [110]:
# count immigrations by the state and show the percentage of foreign born in the state
df_q4 = spark.sql("""
    SELECT t1.State, round(100*Foreign_Born/total, 2) Foreign_Born, count(*) as count
    FROM (SELECT foreign.state, foreign.state_code, sum(foreign.Foreign_Born) Foreign_Born, sum(population.total) total
            FROM foreign
            JOIN population
            ON foreign.city=population.city
            GROUP BY foreign.state, foreign.state_code) t1
    JOIN imgr
    ON imgr.addr=t1.State_Code
    GROUP BY t1.State, Foreign_Born, total
    ORDER BY count DESC
""")
df_q4.show(5)

+----------+------------+------+
|     State|Foreign_Born| count|
+----------+------------+------+
|   Florida|       25.85|621701|
|  New York|       34.68|553677|
|California|       30.11|470386|
|    Hawaii|       28.72|168764|
|     Texas|       20.82|134321|
+----------+------------+------+
only showing top 5 rows



In [111]:
# save all data as parquet file
df_q1.write.parquet("country_from_by_visa", mode="overwrite")
df_q2.write.parquet("age_by_state_and_immigrants_age", mode="overwrite")
df_q3.write.parquet("race_by_state", mode="overwrite")
df_q4.write.parquet("foreign_born_by_state", mode="overwrite")

#### 4.2 Data Quality Checks 

Check if there exists any duplicate data.

In [66]:
# immigration data should have unique id
df_imgr.select("id")\
    .groupby("id")\
    .count()\
    .filter(col("count") > 1)\
    .count()

0

In [67]:
# country data should have unique code
df_country.select("Code")\
    .groupby("Code")\
    .count()\
    .filter(col("count") > 1)\
    .count()

0

In [None]:
# check other staging tables
check_duplicate()

Read parquet files and check record counts and sample data for all tables.

In [118]:
def quality_check(fname):
    """
    Reads a parquet file and prints schema, row count, and sample data.
    """
    # print table name
    print(f"### {fname} ###")
    
    # read parquet file
    df = spark.read.parquet(fname)
    
    # print schema
    df.printSchema()
    
    # print total records count
    print(f"total records: {df.count()}")
    
    #print sample data
    df.show(5)

In [119]:
quality_check("immigration")
quality_check("age")
quality_check("population")
quality_check("race")
quality_check("foreign")
quality_check("country")
quality_check("country_from_by_visa")
quality_check("age_by_state_and_immigrants_age")
quality_check("race_by_state")
quality_check("foreign_born_by_state")

### immigration ###
root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- res: integer (nullable = true)
 |-- port: string (nullable = true)
 |-- addr: string (nullable = true)
 |-- birth: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa: string (nullable = true)
 |-- cit: integer (nullable = true)

total records: 3096313
+-------+----+-----+---+----+----+-----+------+----+---+
|     id|year|month|res|port|addr|birth|gender|visa|cit|
+-------+----+-----+---+----+----+-----+------+----+---+
|4284274|2016|    4|102| NYC|  NY| 1968|     M|  WB|135|
|  27540|2016|    4|103| LOS|   0| 1981|     M|  B2|135|
|4284275|2016|    4|104| DEN|  CO| 1951|     0|  WB|135|
|  27541|2016|    4|104| CLT|  FL| 1949|     F|  WT|135|
|4284276|2016|    4|104| NYC|  NY| 1943|     F|  WT|135|
+-------+----+-----+---+----+----+-----+------+----+---+
only showing top 5 rows

### age ###
root
 |-- City: string (nullable = 

#### 4.3 Data Dictionary

| Table Name | Attributes Name | Contents | Type | Format | PK or FK | FK reference Table |
| :--- | :--- | :--- | :--- | :--- | :--- | :--- | 
| age | city | city name in the US | string | Rockford | PK |  |
| age | state | state name of the city | string | Kentucky | PK |
| age | state_code | 2-character state code | string | KY | |
| age | median_age | people's age in the middle | double | 36.3 | | |
| population | city | city name in the US | string | San Diego | PK | |
| population | state | state name of the city | string | Kentucky | PK |
| population | state_code | 2-character state code | string | KY | |
| population | male | male population in the city | integer | 693826 | |
| population | female | female population in the city | integer | 701081 | |
| population | total | total population in the city | integer | 1394907 | |
| race | city | city name in the US | string | Albany | PK |  |
| race | state | state name of the city | string | Kentucky | PK |
| race | state_code | 2-character state code | string | KY | |
| race | race | race type | string | Hispanic or Latino | |
| race | count | population of each race | integer | 9266 | |
| foreign | city | city name in the US | string | Scranton | PK | |
| foreign | state | state name of the city | string | Kentucky | PK |
| foreign | state_code | 2-character state code | string | KY | |
| foreign | foreign_born | the number of foreign born in the city | integer | 8069 | |
| country | code | 3-digit country code | long | 101 | PK | |
| country | name | country name | string | ALBANIA | |
| immigration | id | immigrant's id | integer | PK | |
| immigration | year | year when entered | integer | 4284274 | |
| immigration | month | month when entered | integer | 2016 | |
| immigration | res | 3-digit country code of departure | integer | 4 | FK | country |
| immigration | port | airport code of destination | string | 102  | |
| immigration | addr | state code of immigrant's address in the US | string | NYC | FK | population |
| immigration | birth | year of immigrant's birth | integer | NY | |
| immigration | gender | immigrant's genter | string | 1968 | |
| immigration | visa | immigrant's visa type | string | B1 | |
| immigration | cit | country code of immigrant's citizenship | integer | 135 | |
| country_from_by_visa | visa | immigrant's visa type | string | B1 | |
| country_from_by_visa | country | country name where they're from | string | INDIA | |
| country_from_by_visa | count | number of corresponding immigrants | long | 32284 | |
| age_by_state_and_immigrants_age | group | age group of immigrants | string | under 20 | |
| age_by_state_and_immigrants_age | state | state name of destination | string | New York | |
| age_by_state_and_immigrants_age | count | number of corresponding immigrants | long | 41284 | |
| age_by_state_and_immigrants_age | median_age | people's middle age in the state | integer | 36 | |
| race_by_state | country_from | country name where they're from | string | AFGHANISTAN | |
| race_by_state | state_to | state name of destination | string | California | |
| race_by_state | count | number of corresponding immigrants | long | 34 | |
| race_by_state | race | race type | string | string | White | |
| race_by_state | rate | percentage of each race population | double | 47.34 | |
| foreign_born_by_state | state | state name of destination | string | Florida | |
| foreign_born_by_state | foreign_born | the number of foreign born in the state | double |
26.8 | |
| foreign_born_by_state | count | number of corresponding immigrants | long | 621701 | |


#### Step 5: Complete Project Write Up
* **Clearly state the rationale for the choice of tools and technologies for the project.**  
  Immigration data file has hundreds of thousands records only for 1 month.  
  Spark is good for big data to read fast, therefore I processed ETL pipeline using Spark.  
  I chose parquet format to save data because parquet is column-oriented data storage format with efficient data compression and encoding schemes.  
* **Propose how often the data should be updated and why.**  
  The most important data in this project is immigration which is updated by month.  
  Therefore ETL process should be run by month.  
* (Write a description of how you would approach the problem differently under the following scenarios:)  
 * **The data was increased by 100x.**  
 ```
   For increased data size, I would need to scale up system horizontally.
   Using cloud computing service like AWS EMR is a good choice to deal with bigger data using more powerful computing nodes in parallel.  
   ```
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day.**
```
   I would use Apache Airflow for scheduling the ETL pipeline.  
   Apache Airflow helps automize the ETL pipeline with wide extension to other tools.
   ```
 * **The database needed to be accessed by 100+ people.**  
   ```
   To ensure high availability, database would be changed into NoSQL database.  
   As relational database is not distributed and has one point to fail, it is not appropriate to multi user access.  
   By using NoSQL database I could provide service with high availability.
   ```