# Data Engineering Capstone Project - Stage 1

## Step 1. Gathering data 

### Importing modules needed for this project

In [1]:
from pyspark.sql.types import *
from pyspark.sql import functions as F

### Creating and configuring SparkSession

In [2]:
import findspark
findspark.init('/path/to/spark/home')

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2,com.amazonaws:aws-java-sdk:1.7.4") \
        .enableHiveSupport().getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "")

### Getting data: Immigration Dataframe

In [3]:
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load('/path/to/i94_apr16_sub.sas7bdat').dropDuplicates()

In [4]:
immigration_df.count()

3096313

In [7]:
immigration_df.show(10)

+------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
| cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
| 474.0|2016.0|   4.0| 103.0| 103.0|    NEW|20545.0|    2.0|   null|20547.0|  25.0|    2.0|  1.0|20160401|    null| null|      G|      O|   null|      M| 1991.0|06292016|     F|  null|    VES|5.5410441233E10|91285|      WT|
|1508.0|2016.0|   4.0| 104.0| 104.0|    NYC|20545.0|    1.0|     NY|20552.0|  16.0|    2.0|  1.0|2016040

In [8]:
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### Changing data types of the columns

In [4]:
immigration_df = immigration_df \
        .withColumn("cicid", immigration_df["cicid"].cast(IntegerType())) \
        .withColumn("i94yr", immigration_df["i94yr"].cast(IntegerType())) \
        .withColumn("i94mon", immigration_df["i94mon"].cast(IntegerType())) \
        .withColumn("i94cit", immigration_df["i94cit"].cast(IntegerType())) \
        .withColumn("i94res", immigration_df["i94res"].cast(IntegerType())) \
        .withColumn("arrdate", immigration_df["arrdate"].cast(StringType())) \
        .withColumn("i94mode", immigration_df["i94mode"].cast(IntegerType())) \
        .withColumn("depdate", immigration_df["depdate"].cast(StringType())) \
        .withColumn("i94bir", immigration_df["i94bir"].cast(IntegerType())) \
        .withColumn("i94visa", immigration_df["i94visa"].cast(IntegerType())) \
        .withColumn("count", immigration_df["count"].cast(IntegerType())) \

In [10]:
immigration_df.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nu

In [5]:
immigration_df = immigration_df.select("cicid","i94yr","i94mon","i94cit","i94res","i94port","arrdate","i94mode","i94addr","depdate","i94bir","i94visa","count","gender","airline","fltno","visatype")

In [12]:
immigration_df.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94yr: integer (nullable = true)
 |-- i94mon: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [13]:
immigration_df.show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|gender|airline|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-------+-----+--------+
|  474| 2016|     4|   103|   103|    NEW|20545.0|      2|   null|20547.0|    25|      2|    1|     F|    VES|91285|      WT|
| 1508| 2016|     4|   104|   104|    NYC|20545.0|      1|     NY|20552.0|    16|      2|    1|     F|     LX|00016|      WT|
| 1669| 2016|     4|   104|   104|    NYC|20545.0|      1|     FL|20561.0|    57|      2|    1|     M|     AA|00039|      WT|
| 2025| 2016|     4|   104|   104|    NYC|20545.0|      1|     NY|20549.0|    51|      2|    1|  null|     SN|01401|      WT|
| 2048| 2016|     4|   104|   104|    MIA|20545.0|      1|     FL|20554.0|     3|      2|    1|  null|     UX|00097|  

### Getting data: Demographics Dataframe

In [6]:
demographics_df = spark.read.format("csv").option("delimiter", ";").option("header","true").load("/path/to/us-cities-demographics.csv").dropDuplicates()

In [11]:
demographics_df.count()

2891

In [16]:
demographics_df.show(10)

+-------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|         City|       State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+-------------+------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|      Lynwood|  California|      29.4|          35634|            36371|           72005|               776|       28061|                  4.43|        CA|Black or African-...|  5346|
|    Hollywood|     Florida|      41.4|          75358|            74363|          149721|              6056|       55158|                  2.65|        FL|               White|107916|
|      Fremont|  California|      38.3|         114383|           117808|  

In [17]:
demographics_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



### Changing data types of the columns and renaming columns

In [7]:
demographics_df = demographics_df \
        .withColumn("Total Population", demographics_df["Total Population"].cast(IntegerType())) \
        .withColumn("Male Population", demographics_df["Male Population"].cast(IntegerType())) \
        .withColumn("Female Population", demographics_df["Female Population"].cast(IntegerType())) \
        .withColumn("Number of Veterans", demographics_df["Number of Veterans"].cast(IntegerType())) \
        .withColumn("Foreign-born", demographics_df["Foreign-born"].cast(IntegerType())) \
        .withColumn("Count", demographics_df["Count"].cast(IntegerType()))

In [8]:
demographics_df = demographics_df \
        .withColumnRenamed("City", "city") \
        .withColumnRenamed("State", "state") \
        .withColumnRenamed("Median Age", "median_age") \
        .withColumnRenamed("Male Population", "male_population") \
        .withColumnRenamed("Female Population", "female_population") \
        .withColumnRenamed("Total Population", "total_population") \
        .withColumnRenamed("Number of Veterans", "num_of_veterans") \
        .withColumnRenamed("Foreign-born", "foreign_born") \
        .withColumnRenamed("Average Household Size", "avg_household_size") \
        .withColumnRenamed("State Code", "state_code") \
        .withColumnRenamed("Race", "race") \
        .withColumnRenamed("Count", "count") \

In [20]:
demographics_df.show()

+--------------------+------------+----------+---------------+-----------------+----------------+---------------+------------+------------------+----------+--------------------+------+
|                city|       state|median_age|male_population|female_population|total_population|num_of_veterans|foreign_born|avg_household_size|state_code|                race| count|
+--------------------+------------+----------+---------------+-----------------+----------------+---------------+------------+------------------+----------+--------------------+------+
|             Lynwood|  California|      29.4|          35634|            36371|           72005|            776|       28061|              4.43|        CA|Black or African-...|  5346|
|           Hollywood|     Florida|      41.4|          75358|            74363|          149721|           6056|       55158|              2.65|        FL|               White|107916|
|             Fremont|  California|      38.3|         114383|           11

## Step 2. ETL - Extracting and transforming data

### Transforming data in order to make tables

In [9]:
pivot_demographics_df = demographics_df.groupBy("state_code").pivot("race").sum("count").sort("state_code")

In [10]:
pivot_demographics_df = pivot_demographics_df \
        .withColumnRenamed("American Indian and Alaska Native", "indian") \
        .withColumnRenamed("Asian", "asian") \
        .withColumnRenamed("Black or African-American", "black") \
        .withColumnRenamed("Hispanic or Latino", "hispanic") \
        .withColumnRenamed("White", "white")

In [67]:
pivot_demographics_df.show()

+----------+------+-------+-------+--------+--------+
|state_code|indian|  asian|  black|hispanic|   white|
+----------+------+-------+-------+--------+--------+
|        AK| 36339|  36825|  23107|   27261|  212696|
|        AL|  8084|  28769| 521068|   39313|  498920|
|        AR|  9381|  22062| 149608|   77813|  384733|
|        AZ|129708| 229183| 296222| 1508157| 3591611|
|        CA|401386|4543730|2047009| 9856464|14905129|
|        CO| 62613| 148790| 208043|  703722| 2463916|
|        CT| 10729|  48311| 231822|  309992|  505674|
|        DC|  6130|  35072| 328786|   71129|  285402|
|        DE|   414|   1193|  44182|    5516|   23743|
|        FL| 46759| 264933|1652619| 1942022| 4758144|
|        GA| 15746|  94136| 779361|  114366|  830002|
|        HI|  5592| 240978|  11781|   24586|  110508|
|        IA|  9141|  43230|  75166|   66424|  610814|
|        ID|  6705|  13985|   7822|   48142|  370314|
|        IL| 35097| 374589|1130574| 1215659| 2620068|
|        IN| 16532|  77906| 

In [11]:
state_list_demographics = []
for i in range(pivot_demographics_df.select("state_code").dropDuplicates().count()):
    state_list_demographics.append(str(pivot_demographics_df.select("state_code").dropDuplicates().collect()[i]["state_code"]))

In [17]:
len(state_list_demographics)

49

In [12]:
immigration_df_filtered = immigration_df.filter(F.col("i94addr").isin(state_list_demographics))

In [27]:
immigration_df_filtered.show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|gender|airline|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-------+-----+--------+
| 1508| 2016|     4|   104|   104|    NYC|20545.0|      1|     NY|20552.0|    16|      2|    1|     F|     LX|00016|      WT|
| 1669| 2016|     4|   104|   104|    NYC|20545.0|      1|     FL|20561.0|    57|      2|    1|     M|     AA|00039|      WT|
| 2025| 2016|     4|   104|   104|    NYC|20545.0|      1|     NY|20549.0|    51|      2|    1|  null|     SN|01401|      WT|
| 2048| 2016|     4|   104|   104|    MIA|20545.0|      1|     FL|20554.0|     3|      2|    1|  null|     UX|00097|      WT|
| 2294| 2016|     4|   107|   107|    CHI|20545.0|      1|     IL|20575.0|    54|      2|    1|     F|     LO|    3|  

In [28]:
immigration_df_filtered.count()

2820069

### Extracting fact table from spark dataframes

In [13]:
fact_source_immigration_df = immigration_df_filtered.groupBy("i94addr").agg(F.sum(immigration_df_filtered["count"]).alias("immigration_count"))

In [30]:
fact_source_immigration_df.show()

+-------+-----------------+
|i94addr|immigration_count|
+-------+-----------------+
|     SC|             9811|
|     AZ|            20218|
|     LA|            22655|
|     MN|            11194|
|     NJ|            76531|
|     DC|            28228|
|     OR|            12574|
|     VA|            31399|
|     RI|             3289|
|     KY|             5790|
|     NH|             2817|
|     MI|            32101|
|     NV|           114609|
|     WI|             7860|
|     ID|             1752|
|     CA|           470386|
|     CT|            13991|
|     NE|            26574|
|     MT|             1339|
|     NC|            23375|
+-------+-----------------+
only showing top 20 rows



In [31]:
fact_source_immigration_df.count()

49

In [14]:
fact_source_demographics_df = pivot_demographics_df

In [33]:
fact_source_demographics_df.show()

+----------+------+-------+-------+--------+--------+
|state_code|indian|  asian|  black|hispanic|   white|
+----------+------+-------+-------+--------+--------+
|        AK| 36339|  36825|  23107|   27261|  212696|
|        AL|  8084|  28769| 521068|   39313|  498920|
|        AR|  9381|  22062| 149608|   77813|  384733|
|        AZ|129708| 229183| 296222| 1508157| 3591611|
|        CA|401386|4543730|2047009| 9856464|14905129|
|        CO| 62613| 148790| 208043|  703722| 2463916|
|        CT| 10729|  48311| 231822|  309992|  505674|
|        DC|  6130|  35072| 328786|   71129|  285402|
|        DE|   414|   1193|  44182|    5516|   23743|
|        FL| 46759| 264933|1652619| 1942022| 4758144|
|        GA| 15746|  94136| 779361|  114366|  830002|
|        HI|  5592| 240978|  11781|   24586|  110508|
|        IA|  9141|  43230|  75166|   66424|  610814|
|        ID|  6705|  13985|   7822|   48142|  370314|
|        IL| 35097| 374589|1130574| 1215659| 2620068|
|        IN| 16532|  77906| 

In [15]:
fact_table = fact_source_immigration_df.join(fact_source_demographics_df, fact_source_immigration_df.i94addr == fact_source_demographics_df.state_code).withColumn("fact_id", F.monotonically_increasing_id())

In [31]:
fact_table.show()

+-------+-----------------+----------+------+-------+-------+--------+--------+------------+
|i94addr|immigration_count|state_code|indian|  asian|  black|hispanic|   white|     fact_id|
+-------+-----------------+----------+------+-------+-------+--------+--------+------------+
|     SC|             9811|        SC|  3705|  13355| 175064|   29863|  343764| 34359738368|
|     AZ|            20218|        AZ|129708| 229183| 296222| 1508157| 3591611| 34359738369|
|     LA|            22655|        LA|  8263|  38739| 602377|   87133|  654578| 77309411328|
|     MN|            11194|        MN| 25242| 151544| 216731|  103229| 1050239| 85899345920|
|     NJ|            76531|        NJ| 11350| 116844| 452202|  600437|  615083|137438953472|
|     DC|            28228|        DC|  6130|  35072| 328786|   71129|  285402|180388626432|
|     OR|            12574|        OR| 38597| 117279|  72150|  201498| 1235819|257698037760|
|     VA|            31399|        VA| 26160| 167784| 771569|  216760|

In [16]:
fact_table = fact_table.select("fact_id","state_code","immigration_count","white","asian","black","hispanic","indian")

In [42]:
fact_table.show()

+------------+----------+-----------------+--------+-------+-------+--------+------+
|     fact_id|state_code|immigration_count|   white|  asian|  black|hispanic|indian|
+------------+----------+-----------------+--------+-------+-------+--------+------+
| 34359738368|        SC|             9811|  343764|  13355| 175064|   29863|  3705|
| 34359738369|        AZ|            20218| 3591611| 229183| 296222| 1508157|129708|
| 77309411328|        LA|            22655|  654578|  38739| 602377|   87133|  8263|
| 85899345920|        MN|            11194| 1050239| 151544| 216731|  103229| 25242|
|137438953472|        NJ|            76531|  615083| 116844| 452202|  600437| 11350|
|180388626432|        DC|            28228|  285402|  35072| 328786|   71129|  6130|
|257698037760|        OR|            12574| 1235819| 117279|  72150|  201498| 38597|
|360777252864|        VA|            31399| 1428158| 167784| 771569|  216760| 26160|
|403726925824|        RI|             3289|  287304|  24245|  555

In [43]:
fact_table.printSchema()

root
 |-- fact_id: long (nullable = false)
 |-- state_code: string (nullable = true)
 |-- immigration_count: long (nullable = true)
 |-- white: long (nullable = true)
 |-- asian: long (nullable = true)
 |-- black: long (nullable = true)
 |-- hispanic: long (nullable = true)
 |-- indian: long (nullable = true)



In [17]:
fact_table = fact_table \
        .withColumn("immigration_count", fact_table["immigration_count"].cast(IntegerType())) \
        .withColumn("white", fact_table["white"].cast(IntegerType())) \
        .withColumn("asian", fact_table["asian"].cast(IntegerType())) \
        .withColumn("black", fact_table["black"].cast(IntegerType())) \
        .withColumn("hispanic", fact_table["hispanic"].cast(IntegerType())) \
        .withColumn("indian", fact_table["indian"].cast(IntegerType()))

In [60]:
fact_table.printSchema()

root
 |-- fact_id: long (nullable = false)
 |-- state_code: string (nullable = true)
 |-- immigration_count: integer (nullable = true)
 |-- white: integer (nullable = true)
 |-- asian: integer (nullable = true)
 |-- black: integer (nullable = true)
 |-- hispanic: integer (nullable = true)
 |-- indian: integer (nullable = true)



In [25]:
fact_table.show()

+------------+----------+-----------------+--------+-------+-------+--------+------+
|     fact_id|state_code|immigration_count|   white|  asian|  black|hispanic|indian|
+------------+----------+-----------------+--------+-------+-------+--------+------+
| 34359738368|        SC|             9811|  343764|  13355| 175064|   29863|  3705|
| 34359738369|        AZ|            20218| 3591611| 229183| 296222| 1508157|129708|
| 77309411328|        LA|            22655|  654578|  38739| 602377|   87133|  8263|
| 85899345920|        MN|            11194| 1050239| 151544| 216731|  103229| 25242|
|137438953472|        NJ|            76531|  615083| 116844| 452202|  600437| 11350|
|180388626432|        DC|            28228|  285402|  35072| 328786|   71129|  6130|
|257698037760|        OR|            12574| 1235819| 117279|  72150|  201498| 38597|
|360777252864|        VA|            31399| 1428158| 167784| 771569|  216760| 26160|
|403726925824|        RI|             3289|  287304|  24245|  555

### Extracting dimemsion tables from spark dataframes

#### dim_state_table

In [39]:
demographics_df.show()

+--------------------+------------+----------+---------------+-----------------+----------------+---------------+------------+------------------+----------+--------------------+------+
|                city|       state|median_age|male_population|female_population|total_population|num_of_veterans|foreign_born|avg_household_size|state_code|                race| count|
+--------------------+------------+----------+---------------+-----------------+----------------+---------------+------------+------------------+----------+--------------------+------+
|             Lynwood|  California|      29.4|          35634|            36371|           72005|            776|       28061|              4.43|        CA|Black or African-...|  5346|
|           Hollywood|     Florida|      41.4|          75358|            74363|          149721|           6056|       55158|              2.65|        FL|               White|107916|
|             Fremont|  California|      38.3|         114383|           11

In [18]:
dim_state_table = demographics_df.groupBy("state_code","state").agg(F.sum(demographics_df["foreign_born"])).select("state_code","state")

In [55]:
dim_state_table.show()

+----------+--------------------+
|state_code|               state|
+----------+--------------------+
|        MT|             Montana|
|        NC|      North Carolina|
|        MD|            Maryland|
|        CO|            Colorado|
|        CT|         Connecticut|
|        IL|            Illinois|
|        NJ|          New Jersey|
|        DE|            Delaware|
|        DC|District of Columbia|
|        TN|           Tennessee|
|        LA|           Louisiana|
|        AR|            Arkansas|
|        AK|              Alaska|
|        CA|          California|
|        NM|          New Mexico|
|        UT|                Utah|
|        MI|            Michigan|
|        NY|            New York|
|        NH|       New Hampshire|
|        WA|          Washington|
+----------+--------------------+
only showing top 20 rows



In [42]:
dim_state_table.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- state: string (nullable = true)



#### dim_visa_table

In [43]:
immigration_df_filtered.show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|gender|airline|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+------+-------+-----+--------+
| 1508| 2016|     4|   104|   104|    NYC|20545.0|      1|     NY|20552.0|    16|      2|    1|     F|     LX|00016|      WT|
| 1669| 2016|     4|   104|   104|    NYC|20545.0|      1|     FL|20561.0|    57|      2|    1|     M|     AA|00039|      WT|
| 2025| 2016|     4|   104|   104|    NYC|20545.0|      1|     NY|20549.0|    51|      2|    1|  null|     SN|01401|      WT|
| 2048| 2016|     4|   104|   104|    MIA|20545.0|      1|     FL|20554.0|     3|      2|    1|  null|     UX|00097|      WT|
| 2294| 2016|     4|   107|   107|    CHI|20545.0|      1|     IL|20575.0|    54|      2|    1|     F|     LO|    3|  

In [19]:
pivot_immigration_visa_df = immigration_df_filtered.groupBy("i94addr").pivot("visatype").sum("count")

In [32]:
pivot_immigration_visa_df.show()

+-------+-----+------+----+----+----+----+----+----+----+---+----+----+----+----+-----+------+
|i94addr|   B1|    B2|  CP| CPL|  E1|  E2|  F1|  F2| GMT|  I|  I1|  M1|  M2| SBP|   WB|    WT|
+-------+-----+------+----+----+----+----+----+----+----+---+----+----+----+----+-----+------+
|     AZ| 2013|  6125|  16|   1|  12| 115| 209|  21|   5| 21|   3| 159|   5|null| 3432|  8081|
|     SC| 1150|  2134|null|null|  30| 304|  96|   9|null| 15|null|null|null|null| 2208|  3865|
|     LA| 2642|  6597|  44|null|  10|  76| 179|  11|null| 23|   3|   2|null|null| 4203|  8865|
|     MN| 1972|  3729|null|null|   7|  48| 221|  12|null|  9|null|null|null|null| 2800|  2396|
|     NJ| 4898| 39483|   9|null| 199| 555| 740|  70|null| 77|   5|   8|null|   1| 6782| 23704|
|     DC| 4102|  7390|   1|null|  17|  51| 325|  17|null|276|  17|   3|   1|null| 6236|  9792|
|     OR| 1556|  4264|   6|null|  13| 128| 344|  24|null|  6|null|  25|null|null| 2238|  3970|
|     VA| 2811| 15184|  10|null|  49| 187| 575|  8

In [20]:
dim_visa_table = pivot_immigration_visa_df

In [21]:
dim_visa_table = dim_visa_table \
        .withColumnRenamed("i94addr", "state_code") \
        .withColumn("B1", dim_visa_table["B1"].cast(IntegerType())) \
        .withColumn("B2", dim_visa_table["B2"].cast(IntegerType())) \
        .withColumn("CP", dim_visa_table["CP"].cast(IntegerType())) \
        .withColumn("CPL", dim_visa_table["CPL"].cast(IntegerType())) \
        .withColumn("E1", dim_visa_table["E1"].cast(IntegerType())) \
        .withColumn("E2", dim_visa_table["E2"].cast(IntegerType())) \
        .withColumn("F1", dim_visa_table["F1"].cast(IntegerType())) \
        .withColumn("F2", dim_visa_table["F2"].cast(IntegerType())) \
        .withColumn("GMT", dim_visa_table["GMT"].cast(IntegerType())) \
        .withColumn("I", dim_visa_table["I"].cast(IntegerType())) \
        .withColumn("I1", dim_visa_table["I1"].cast(IntegerType())) \
        .withColumn("M1", dim_visa_table["M1"].cast(IntegerType())) \
        .withColumn("M2", dim_visa_table["M2"].cast(IntegerType())) \
        .withColumn("SBP", dim_visa_table["SBP"].cast(IntegerType())) \
        .withColumn("WB", dim_visa_table["WB"].cast(IntegerType())) \
        .withColumn("WT", dim_visa_table["WT"].cast(IntegerType()))

In [37]:
dim_visa_table.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- B1: integer (nullable = true)
 |-- B2: integer (nullable = true)
 |-- CP: integer (nullable = true)
 |-- CPL: integer (nullable = true)
 |-- E1: integer (nullable = true)
 |-- E2: integer (nullable = true)
 |-- F1: integer (nullable = true)
 |-- F2: integer (nullable = true)
 |-- GMT: integer (nullable = true)
 |-- I: integer (nullable = true)
 |-- I1: integer (nullable = true)
 |-- M1: integer (nullable = true)
 |-- M2: integer (nullable = true)
 |-- SBP: integer (nullable = true)
 |-- WB: integer (nullable = true)
 |-- WT: integer (nullable = true)



In [58]:
dim_visa_table.show()

+----------+-----+------+----+----+----+----+----+----+----+---+----+----+----+----+-----+------+
|state_code|   B1|    B2|  CP| CPL|  E1|  E2|  F1|  F2| GMT|  I|  I1|  M1|  M2| SBP|   WB|    WT|
+----------+-----+------+----+----+----+----+----+----+----+---+----+----+----+----+-----+------+
|        AZ| 2013|  6125|  16|   1|  12| 115| 209|  21|   5| 21|   3| 159|   5|null| 3432|  8081|
|        SC| 1150|  2134|null|null|  30| 304|  96|   9|null| 15|null|null|null|null| 2208|  3865|
|        LA| 2642|  6597|  44|null|  10|  76| 179|  11|null| 23|   3|   2|null|null| 4203|  8865|
|        MN| 1972|  3729|null|null|   7|  48| 221|  12|null|  9|null|null|null|null| 2800|  2396|
|        NJ| 4898| 39483|   9|null| 199| 555| 740|  70|null| 77|   5|   8|null|   1| 6782| 23704|
|        DC| 4102|  7390|   1|null|  17|  51| 325|  17|null|276|  17|   3|   1|null| 6236|  9792|
|        OR| 1556|  4264|   6|null|  13| 128| 344|  24|null|  6|null|  25|null|null| 2238|  3970|
|        VA| 2811| 1

#### dim_foreign_table

In [51]:
demographics_df.show()

+--------------------+------------+----------+---------------+-----------------+----------------+---------------+------------+------------------+----------+--------------------+------+
|                city|       state|median_age|male_population|female_population|total_population|num_of_veterans|foreign_born|avg_household_size|state_code|                race| count|
+--------------------+------------+----------+---------------+-----------------+----------------+---------------+------------+------------------+----------+--------------------+------+
|             Lynwood|  California|      29.4|          35634|            36371|           72005|            776|       28061|              4.43|        CA|Black or African-...|  5346|
|           Hollywood|     Florida|      41.4|          75358|            74363|          149721|           6056|       55158|              2.65|        FL|               White|107916|
|             Fremont|  California|      38.3|         114383|           11

In [22]:
dim_foreign_table = demographics_df.groupBy("state_code").agg(F.sum(demographics_df["foreign_born"]).alias("state_foreign_born"))

In [53]:
dim_foreign_table.show()

+----------+------------------+
|state_code|state_foreign_born|
+----------+------------------+
|        AZ|           3411565|
|        SC|            134019|
|        LA|            417095|
|        MN|           1069888|
|        NJ|           2327750|
|        DC|            475585|
|        OR|            928765|
|        VA|           1346270|
|        RI|            431507|
|        KY|            332440|
|        NH|            135995|
|        MI|           1214547|
|        NV|           2406685|
|        WI|            623435|
|        ID|            140630|
|        CA|          37059662|
|        CT|           1114250|
|        NE|            356105|
|        MT|             29885|
|        NC|           1896635|
+----------+------------------+
only showing top 20 rows



In [54]:
dim_foreign_table.count()

49

In [56]:
dim_foreign_table.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- state_foreign_born: long (nullable = true)



In [23]:
dim_foreign_table = dim_foreign_table \
        .withColumn("state_foreign_born", dim_foreign_table["state_foreign_born"].cast(IntegerType()))

In [30]:
dim_foreign_table.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- state_foreign_born: integer (nullable = true)



In [59]:
dim_foreign_table.show()

+----------+------------------+
|state_code|state_foreign_born|
+----------+------------------+
|        AZ|           3411565|
|        SC|            134019|
|        LA|            417095|
|        MN|           1069888|
|        NJ|           2327750|
|        DC|            475585|
|        OR|            928765|
|        VA|           1346270|
|        RI|            431507|
|        KY|            332440|
|        NH|            135995|
|        MI|           1214547|
|        NV|           2406685|
|        WI|            623435|
|        ID|            140630|
|        CA|          37059662|
|        CT|           1114250|
|        NE|            356105|
|        MT|             29885|
|        NC|           1896635|
+----------+------------------+
only showing top 20 rows



## Step 3. ETL - Loading data

### Loading parquet data to S3 bucket

In [62]:
fact_table.write.mode("overwrite").parquet("s3a://my-bucket/fact_table")
dim_state_table.write.mode('overwrite').parquet("s3a://my-bucket/dim_state_table")
dim_visa_table.write.mode('overwrite').parquet("s3a://my-bucket/dim_visa_table")
dim_foreign_table.write.mode('overwrite').parquet("s3a://my-bucket/dim_foreign_table")

## Step 4. Analytics

### 4-1. Figure out the state that has the most immigrants

In [140]:
import pandas as pd

In [141]:
fact_pdf = fact_table.select("*").toPandas()

In [142]:
fact_pdf.head(5)

Unnamed: 0,fact_id,state_code,immigration_count,white,asian,black,hispanic,indian
0,34359738368,SC,9811,343764,13355,175064,29863,3705
1,34359738369,AZ,20218,3591611,229183,296222,1508157,129708
2,77309411328,LA,22655,654578,38739,602377,87133,8263
3,85899345920,MN,11194,1050239,151544,216731,103229,25242
4,137438953472,NJ,76531,615083,116844,452202,600437,11350


In [143]:
fact_pdf["immigration_count"].max()

621701

In [144]:
fact_pdf[fact_pdf.immigration_count == fact_pdf.immigration_count.max()]

Unnamed: 0,fact_id,state_code,immigration_count,white,asian,black,hispanic,indian
41,1503238553600,FL,621701,4758144,264933,1652619,1942022,46759


In [146]:
str(fact_pdf[fact_pdf.immigration_count == fact_pdf.immigration_count.max()]['state_code'].values[0])

'FL'

### 4-2. Figure out the state that has the least immigrants

In [147]:
fact_pdf["immigration_count"].min()

557

In [148]:
fact_pdf[fact_pdf.immigration_count == fact_pdf.immigration_count.min()]

Unnamed: 0,fact_id,state_code,immigration_count,white,asian,black,hispanic,indian
35,996432412672,SD,557,213281,6859,13121,12359,13782


In [149]:
str(fact_pdf[fact_pdf.immigration_count == fact_pdf.immigration_count.min()]['state_code'].values[0])

'SD'

### 4-3. Figure out the state that has the most ratio of white people

In [50]:
fact_pdf.head(10)

Unnamed: 0,fact_id,state_code,immigration_count,white,asian,black,hispanic,indian
0,34359738368,SC,9811,343764,13355,175064,29863,3705
1,34359738369,AZ,20218,3591611,229183,296222,1508157,129708
2,77309411328,LA,22655,654578,38739,602377,87133,8263
3,85899345920,MN,11194,1050239,151544,216731,103229,25242
4,137438953472,NJ,76531,615083,116844,452202,600437,11350
5,180388626432,DC,28228,285402,35072,328786,71129,6130
6,257698037760,OR,12574,1235819,117279,72150,201498,38597
7,360777252864,VA,31399,1428158,167784,771569,216760,26160
8,403726925824,RI,3289,287304,24245,55556,109226,6369
9,420906795008,KY,5790,705790,32667,202749,50478,7772


In [81]:
fact_pdf['white_percentage'] = (fact_pdf['white']/(fact_pdf['white']+fact_pdf['asian']+fact_pdf['black']+fact_pdf['hispanic']+fact_pdf['indian']) * 100).round(2)

In [82]:
fact_pdf.head(10)

Unnamed: 0,fact_id,state_code,immigration_count,white,asian,black,hispanic,indian,white_percentage,white_percentage[%]
0,34359738368,SC,9811,343764,13355,175064,29863,3705,60.76,60.76
1,34359738369,AZ,20218,3591611,229183,296222,1508157,129708,62.41,62.41
2,77309411328,LA,22655,654578,38739,602377,87133,8263,47.06,47.06
3,85899345920,MN,11194,1050239,151544,216731,103229,25242,67.89,67.89
4,137438953472,NJ,76531,615083,116844,452202,600437,11350,34.25,34.25
5,180388626432,DC,28228,285402,35072,328786,71129,6130,39.28,39.28
6,257698037760,OR,12574,1235819,117279,72150,201498,38597,74.21,74.21
7,360777252864,VA,31399,1428158,167784,771569,216760,26160,54.71,54.71
8,403726925824,RI,3289,287304,24245,55556,109226,6369,59.52,59.52
9,420906795008,KY,5790,705790,32667,202749,50478,7772,70.62,70.62


In [85]:
fact_pdf = fact_pdf.drop(columns=['white_percentage[%]'])

In [86]:
fact_pdf.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 49 entries, 0 to 48
Data columns (total 9 columns):
fact_id              49 non-null int64
state_code           49 non-null object
immigration_count    49 non-null int32
white                49 non-null int32
asian                49 non-null int32
black                49 non-null int32
hispanic             49 non-null int32
indian               49 non-null int32
white_percentage     49 non-null float64
dtypes: float64(1), int32(6), int64(1), object(1)
memory usage: 2.4+ KB


In [87]:
fact_pdf["white_percentage"].max()

86.82

In [97]:
fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]

Unnamed: 0,fact_id,state_code,immigration_count,white,asian,black,hispanic,indian,white_percentage
26,781684047873,ND,1225,172068,5576,8177,5234,7142,86.82


In [112]:
fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]['white']

26    172068
Name: white, dtype: int32

In [111]:
fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]['white'].values[0]

172068

In [120]:
str(fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]['state_code'].values[0])

'ND'

### 4-4. Figure out the state that has the least ratio of white people

In [89]:
fact_pdf["white_percentage"].min()

0.0

In [114]:
fact_pdf_2 = fact_pdf.replace(0, fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]['white'].values[0])

In [117]:
fact_pdf_2["white_percentage"].min()

28.09

In [118]:
fact_pdf_2[fact_pdf_2.white_percentage == fact_pdf_2.white_percentage.min()]

Unnamed: 0,fact_id,state_code,immigration_count,white,asian,black,hispanic,indian,white_percentage
48,1709396983808,HI,168764,110508,240978,11781,24586,5592,28.09


In [150]:
str(fact_pdf_2[fact_pdf_2.white_percentage == fact_pdf_2.white_percentage.min()]['state_code'].values[0])

'HI'