#### 1. Explore and Assess the Data

#####  1-A Import the libraries

In [1]:
# declare necessary imports
import pandas as pd
import glob
import configparser
from datetime import datetime
import pyspark.sql.functions as F
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DateType
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [2]:
#dedclaring an output folder- where the fact and dimension tables will be saved
output = "output"

In [3]:
#initiate spark session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [4]:
from immigration_codes import country_udf

###  2. Explore the Immigration Data

 ####  2-A Read the Immigration Data

In [5]:
# read the immigration as a dataframe from the sas_data folder
path = "sas_data"
df_im = spark.read.parquet(path)

In [6]:
#see the datatype of the dataframe
df_im.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

 ####  2-B. Data Quality Issues of Immigration Data


 **Data Quality Issue-1**: Column names are not descriptive and therefore difficult to understand.    
    

In [7]:
#select the required columns and adjust the column headers to more desriptive ones
df_im = df_im.select(col("i94yr").alias("immigration_yr"),
                     col("i94mon").alias("immigration_month"),
                     col("i94res").alias("country_of_origin"),
                     col("i94addr").alias("state_code"),
                     col('biryear').alias("birth_year"),
                     col('gender')
                    )

In [8]:
#show the first five rows to see the new dataframe with the selected columns
df_im.show(3)

+--------------+-----------------+-----------------+----------+----------+------+
|immigration_yr|immigration_month|country_of_origin|state_code|birth_year|gender|
+--------------+-----------------+-----------------+----------+----------+------+
|        2016.0|              4.0|            438.0|        CA|    1976.0|     F|
|        2016.0|              4.0|            438.0|        NV|    1984.0|     F|
|        2016.0|              4.0|            438.0|        WA|    1987.0|     M|
+--------------+-----------------+-----------------+----------+----------+------+
only showing top 3 rows



In [10]:
#see the current state of the dataframe columns and the datatype
df_im.printSchema()

root
 |-- immigration_yr: double (nullable = true)
 |-- immigration_month: double (nullable = true)
 |-- country_of_origin: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- birth_year: double (nullable = true)
 |-- gender: string (nullable = true)



 **Data Quality Issue-2**: Some of the datatypes are not appropriate. 


In [11]:
#change the datatype of the dataframe
df_im = df_im.withColumn("immigration_yr", df_im["immigration_yr"].cast('int'))\
             .withColumn("immigration_month", df_im["immigration_month"].cast('int'))\
             .withColumn("country_of_origin", df_im["country_of_origin"].cast('int'))\
             .withColumn("birth_year", df_im["birth_year"].cast('int'))

In [12]:
#see the impact of the datatype conversion
df_im.printSchema()

root
 |-- immigration_yr: integer (nullable = true)
 |-- immigration_month: integer (nullable = true)
 |-- country_of_origin: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)



In [13]:
#see the first two rows 
df_im.show(3)

+--------------+-----------------+-----------------+----------+----------+------+
|immigration_yr|immigration_month|country_of_origin|state_code|birth_year|gender|
+--------------+-----------------+-----------------+----------+----------+------+
|          2016|                4|              438|        CA|      1976|     F|
|          2016|                4|              438|        NV|      1984|     F|
|          2016|                4|              438|        WA|      1987|     M|
+--------------+-----------------+-----------------+----------+----------+------+
only showing top 3 rows



 **Data Quality Issue-3**: We shall check if the dataframe has null and duplicate values. If there are null and duplicate values, we shall remove them

In [14]:
# count the number of rows and columns
df_im.count(), len(df_im.columns)

(3096313, 6)

In [15]:
#count the number of null values in the dataframe
df_null_count_new = df_im.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df_im.columns])

#show the total number of null values per column
df_null_count_new.show()

+--------------+-----------------+-----------------+----------+----------+------+
|immigration_yr|immigration_month|country_of_origin|state_code|birth_year|gender|
+--------------+-----------------+-----------------+----------+----------+------+
|             0|                0|                0|    152592|       802|414269|
+--------------+-----------------+-----------------+----------+----------+------+



**Note:** There are null values in the state_code, birth_year and the gender columns. We would like to get rid of all the rows with null values.

In [16]:
#remove null and duplicate values 
df_im = df_im.na.drop().dropDuplicates()

In [17]:
#count the number of null values in the dataframe
df_null_count_new = df_im.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df_im.columns])

#show the total number of null values per column- should return 0 as we have removed the null values
df_null_count_new.show()

+--------------+-----------------+-----------------+----------+----------+------+
|immigration_yr|immigration_month|country_of_origin|state_code|birth_year|gender|
+--------------+-----------------+-----------------+----------+----------+------+
|             0|                0|                0|         0|         0|     0|
+--------------+-----------------+-----------------+----------+----------+------+



In [18]:
#show the number of rows after we have removed the null and the duplicates
df_im.count()

262636

In [19]:
#show the 05 rows of the dataframe to observe the current status of the dataframe
df_im.show(5)

+--------------+-----------------+-----------------+----------+----------+------+
|immigration_yr|immigration_month|country_of_origin|state_code|birth_year|gender|
+--------------+-----------------+-----------------+----------+----------+------+
|          2016|                4|              251|        NY|      1985|     F|
|          2016|                4|              276|        MI|      1980|     M|
|          2016|                4|              276|        NY|      1964|     F|
|          2016|                4|              276|        NJ|      1945|     F|
|          2016|                4|              276|        NY|      1936|     M|
+--------------+-----------------+-----------------+----------+----------+------+
only showing top 5 rows



In [20]:
#import function from immigration_codes.py file
from immigration_codes import country_udf

In [21]:
#extract the name of the country from the country_of_origin column
df_im = df_im.withColumn("country_of_origin", country_udf(df_im["country_of_origin"]))

In [22]:
#show the 5 rows of the dataframe 
df_im.printSchema()

root
 |-- immigration_yr: integer (nullable = true)
 |-- immigration_month: integer (nullable = true)
 |-- country_of_origin: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)



In [23]:
#show the first five wors of the dataframe to see if the country_of_origin has been converted 
df_im.show(5)

+--------------+-----------------+-----------------+----------+----------+------+
|immigration_yr|immigration_month|country_of_origin|state_code|birth_year|gender|
+--------------+-----------------+-----------------+----------+----------+------+
|          2016|                4|           ISRAEL|        NY|      1985|     F|
|          2016|                4|      SOUTH KOREA|        MI|      1980|     M|
|          2016|                4|      SOUTH KOREA|        NY|      1964|     F|
|          2016|                4|      SOUTH KOREA|        NJ|      1945|     F|
|          2016|                4|      SOUTH KOREA|        NY|      1936|     M|
+--------------+-----------------+-----------------+----------+----------+------+
only showing top 5 rows



**Note:** Now the immigration data is ready to be used as we have cleaned it.

###  3. Explore the US Cities Demographics

####  3-A.  Read the US City Demographies Data

In [24]:
#dataframe for the US city demographies data
df_demo = spark.read.csv('us-cities-demographics.csv', header=True, mode="DROPMALFORMED", sep = ';')

In [25]:
#show the columns and the datatype of the dataframe columns 
df_demo.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



####  3-B.  Data Quality Issues

**Data Quality Issue-1:** There are inappropriate data types: for instance median age is string.

**Data Quality Issue-2:** There is space in the column headers name and mixture of capital and small letters which is at times seems difficult to read. 

We shall now handle all these data quality issues below.


In [26]:
#select the needed column required for our analysis- City, State, Median Age, Foreign-born, State Code and Race
#Change the names as appropriate
#change the data type for the state_code and the median_age

df_demo = df_demo.select(col("City").alias("city"),
                         col("State").alias("state"),
                         col("Median Age").alias("median_age").cast('float'),
                         col("Foreign-born").alias("foreign_born").cast('int'),
                         col("State Code").alias("state_code"),
                         col("Race").alias("race"),
                         )

In [27]:
# see the dataframe to observe the column header changes
df_demo.show(5)

+----------------+-------------+----------+------------+----------+--------------------+
|            city|        state|median_age|foreign_born|state_code|                race|
+----------------+-------------+----------+------------+----------+--------------------+
|   Silver Spring|     Maryland|      33.8|       30908|        MD|  Hispanic or Latino|
|          Quincy|Massachusetts|      41.0|       32935|        MA|               White|
|          Hoover|      Alabama|      38.5|        8229|        AL|               Asian|
|Rancho Cucamonga|   California|      34.5|       33878|        CA|Black or African-...|
|          Newark|   New Jersey|      34.6|       86253|        NJ|               White|
+----------------+-------------+----------+------------+----------+--------------------+
only showing top 5 rows



In [28]:
#see the impace of column header and data type changes
df_demo.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: float (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)



 **Data Quality Issue-3**: The dataframe has null values. We shall now remove the null values along with the duplicate values.

In [29]:
#count the number of null values in the dataframe
df_null_count_new = df_demo.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df_demo.columns])

#show the total number of null values per column
df_null_count_new.show()

+----+-----+----------+------------+----------+----+
|city|state|median_age|foreign_born|state_code|race|
+----+-----+----------+------------+----------+----+
|   0|    0|         0|          13|         0|   0|
+----+-----+----------+------------+----------+----+



In [30]:
# count the number of rows and columns
df_demo.count(), len(df_demo.columns)

(2891, 6)

In [31]:
#remove null and duplicate values 
df_demo = df_demo.na.drop().dropDuplicates()

In [32]:
#count the number of null values in the dataframe
df_null_count_new = df_demo.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df_demo.columns])

#show the total number of null values per column- should return 0 as we have dropped the null values
df_null_count_new.show()

+----+-----+----------+------------+----------+----+
|city|state|median_age|foreign_born|state_code|race|
+----+-----+----------+------------+----------+----+
|   0|    0|         0|           0|         0|   0|
+----+-----+----------+------------+----------+----+



In [33]:
#show the new dataframe 
df_demo.show(5)

+------------+-------------+----------+------------+----------+--------------------+
|        city|        state|median_age|foreign_born|state_code|                race|
+------------+-------------+----------+------------+----------+--------------------+
|        Lynn|Massachusetts|      34.8|       29899|        MA|               Asian|
| Gainesville|      Florida|      26.0|       15272|        FL|American Indian a...|
|  Springdale|     Arkansas|      31.8|       19969|        AR|  Hispanic or Latino|
| New Orleans|    Louisiana|      35.9|       21679|        LA|               White|
|Murfreesboro|    Tennessee|      30.2|        8948|        TN|               Asian|
+------------+-------------+----------+------------+----------+--------------------+
only showing top 5 rows



In [34]:
# count the number of rows and columns
df_demo.count(), len(df_demo.columns)

(2878, 6)

In [35]:
#show the name of the columns and the data type
df_demo.printSchema()

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: float (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- state_code: string (nullable = true)
 |-- race: string (nullable = true)



###  4. Define the Conceptual Data Model

#### 4-A Star Schema
##### Dimension Tables

+ immigration_table : year, month, country_of_origin, state_code, birth_year, gender

+ demographics_table : city, state, median_age, number_of_immigrants, state_code, & race



##### Fact Table

+ immigration_impact_table : state, new_immigrant_per_state, total_immigrants_per_state

#### 4.A Dimension Tables

#### 4.A.1 Dimension Table-1

+ immigration_table:  year, month, country_of_origin, state_code, birth_year, gender

In [36]:
#show the first 05 rows of the immigration table 
df_im.show(5)

+--------------+-----------------+-----------------+----------+----------+------+
|immigration_yr|immigration_month|country_of_origin|state_code|birth_year|gender|
+--------------+-----------------+-----------------+----------+----------+------+
|          2016|                4|           ISRAEL|        NY|      1985|     F|
|          2016|                4|      SOUTH KOREA|        MI|      1980|     M|
|          2016|                4|      SOUTH KOREA|        NY|      1964|     F|
|          2016|                4|      SOUTH KOREA|        NJ|      1945|     F|
|          2016|                4|      SOUTH KOREA|        NY|      1936|     M|
+--------------+-----------------+-----------------+----------+----------+------+
only showing top 5 rows



In [37]:
#extract immigration table from the df_im dataframe
immigration_table = df_im.selectExpr("immigration_yr as year",
                                     "immigration_month as month",
                                     "country_of_origin",
                                     "state_code",
                                     "birth_year", 
                                     'gender')

In [38]:
#show the dimension table immigration_table 
immigration_table.show(5)

+----+-----+-----------------+----------+----------+------+
|year|month|country_of_origin|state_code|birth_year|gender|
+----+-----+-----------------+----------+----------+------+
|2016|    4|           ISRAEL|        NY|      1985|     F|
|2016|    4|      SOUTH KOREA|        MI|      1980|     M|
|2016|    4|      SOUTH KOREA|        NY|      1964|     F|
|2016|    4|      SOUTH KOREA|        NJ|      1945|     F|
|2016|    4|      SOUTH KOREA|        NY|      1936|     M|
+----+-----+-----------------+----------+----------+------+
only showing top 5 rows



In [32]:
# write dimension table to parquet files partitioned by year and month
#immigration_table.write.partitionBy('state_code','year', 'month').parquet(os.path.join(output,'immigration_table'), 'overwrite')

#### 4.A.2 Dimension Table-2

+ demographics_table: city, state, median_age, number_of_immigrants, state_code, & race

In [39]:
#show the first 05 rows of the demographic table 
df_demo.show(5)

+------------+-------------+----------+------------+----------+--------------------+
|        city|        state|median_age|foreign_born|state_code|                race|
+------------+-------------+----------+------------+----------+--------------------+
|        Lynn|Massachusetts|      34.8|       29899|        MA|               Asian|
| Gainesville|      Florida|      26.0|       15272|        FL|American Indian a...|
|  Springdale|     Arkansas|      31.8|       19969|        AR|  Hispanic or Latino|
| New Orleans|    Louisiana|      35.9|       21679|        LA|               White|
|Murfreesboro|    Tennessee|      30.2|        8948|        TN|               Asian|
+------------+-------------+----------+------------+----------+--------------------+
only showing top 5 rows



In [40]:
#extract demographics_table from the df_demo dataframe
demographics_table = df_demo.selectExpr("city ",
                                       "state",
                                        "state_code",
                                        "median_age",
                                       "foreign_born as number_of_immigrants",
                                        "race"
                                       )

In [41]:
#show the first five rows of the demographics table
demographics_table.show(5)

+------------+-------------+----------+----------+--------------------+--------------------+
|        city|        state|state_code|median_age|number_of_immigrants|                race|
+------------+-------------+----------+----------+--------------------+--------------------+
|        Lynn|Massachusetts|        MA|      34.8|               29899|               Asian|
| Gainesville|      Florida|        FL|      26.0|               15272|American Indian a...|
|  Springdale|     Arkansas|        AR|      31.8|               19969|  Hispanic or Latino|
| New Orleans|    Louisiana|        LA|      35.9|               21679|               White|
|Murfreesboro|    Tennessee|        TN|      30.2|                8948|               Asian|
+------------+-------------+----------+----------+--------------------+--------------------+
only showing top 5 rows



In [36]:
# write dimension tables to parquet files partitioned by  state
#demographics_table.write.partitionBy('state').parquet(os.path.join(output,'demographics_table'), 'overwrite')

#### 4.A.3 Fact Table

+ immigration_impact_table : state, new_immigrant_per_state, total_immigrants_per_state

In [42]:
# count the number of new_immigrant_per_state
immigrant_per_state_df = immigration_table.groupby('state_code').count()

In [43]:
# show first 05 rows of the dataframe
immigrant_per_state_df.show(5)

+----------+-----+
|state_code|count|
+----------+-----+
|        .N|    3|
|        CI|    5|
|        TC|    1|
|        AZ| 5146|
|        SC| 3070|
+----------+-----+
only showing top 5 rows



In [44]:
#rename the dataframe column
immigrant_per_state_df_1 =  immigrant_per_state_df.select(col("state_code").alias("state_code_im"),
                                                          col("count").alias("new_immigrant_per_state"))

In [45]:
#show the renamed dataframe
immigrant_per_state_df_1.show(5)

+-------------+-----------------------+
|state_code_im|new_immigrant_per_state|
+-------------+-----------------------+
|           .N|                      3|
|           CI|                      5|
|           TC|                      1|
|           AZ|                   5146|
|           SC|                   3070|
+-------------+-----------------------+
only showing top 5 rows



In [48]:
# country total_immigrants_per_state
demo_count_df = demographics_table.groupby("state_code").sum(('number_of_immigrants'))

In [49]:
#show the total_immigrants_per_state table
demo_count_df_1 =  demo_count_df.select(col("state_code"),
                                        col("sum(number_of_immigrants)").alias("total_immigrants_per_state"))

In [50]:
#rename the dataframe column
demo_count_df_1.show(5)

+----------+--------------------------+
|state_code|total_immigrants_per_state|
+----------+--------------------------+
|        SC|                    134019|
|        AZ|                   3411565|
|        LA|                    417095|
|        MN|                   1069888|
|        NJ|                   2327750|
+----------+--------------------------+
only showing top 5 rows



In [51]:
#join the two table bases on the state_code
immigration_impact_table_1 = immigrant_per_state_df_1.join(demo_count_df_1, 
                                                           (demo_count_df_1.state_code == immigrant_per_state_df_1.state_code_im),'inner')

In [52]:
#show the dataframe created after joining the two dataframe based on state_code
immigration_impact_table_1.show(5)

+-------------+-----------------------+----------+--------------------------+
|state_code_im|new_immigrant_per_state|state_code|total_immigrants_per_state|
+-------------+-----------------------+----------+--------------------------+
|           AZ|                   5146|        AZ|                   3411565|
|           SC|                   3070|        SC|                    134019|
|           LA|                   5576|        LA|                    417095|
|           MN|                   4127|        MN|                   1069888|
|           NJ|                  10269|        NJ|                   2327750|
+-------------+-----------------------+----------+--------------------------+
only showing top 5 rows



In [53]:
#form the fact table with the required columns
immigration_impact_table = immigration_impact_table_1.select(col("state_code"),
                                                             col("new_immigrant_per_state"),
                                                             col("total_immigrants_per_state"))

In [54]:
#show the fact table
immigration_impact_table.show(5)

+----------+-----------------------+--------------------------+
|state_code|new_immigrant_per_state|total_immigrants_per_state|
+----------+-----------------------+--------------------------+
|        AZ|                   5146|                   3411565|
|        SC|                   3070|                    134019|
|        LA|                   5576|                    417095|
|        MN|                   4127|                   1069888|
|        NJ|                  10269|                   2327750|
+----------+-----------------------+--------------------------+
only showing top 5 rows



In [55]:
# count the number of rows and columns in the fact table
immigration_impact_table.count(), len(immigration_impact_table.columns)

(48, 3)

In [56]:
# show the columns and the data type of the fact table
immigration_impact_table.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- new_immigrant_per_state: long (nullable = false)
 |-- total_immigrants_per_state: long (nullable = true)



In [None]:
# write fact tables to parquet files partitioned by state_code
#immigration_impact_table.write.partitionBy('state_code').parquet(os.path.join(output,'immigration_impact_table'), 'overwrite')

In [57]:
#emprty the output folder 
import shutil
shutil.rmtree('output/demographics_table')
shutil.rmtree('output/immigration_impact_table')
shutil.rmtree('output/immigration_table')

+ Note: Fact and dimension tables have been written in the output folder. However, it creates problem while submitting this large number of files and therefore the folders have been made empty. The command lines for writing the files have been commented so that it does not take long time while running this entire file. The fact and dimension tables can be created and saved by uncommenting and running the appropriate lines. 