# US Immigration and Average Temperature Analytics
### Data Engineering Capstone Project

#### Project Summary
Currently the immigration data is being looked up as a fundamental dataset here. We also have temperature data around all the cities of the world. Using these datasets, I have decided to model the data based on location, time and immigration data to get average temperature of the designated city. The dataframes are processed using spark and aggregration operations are performed accordingly. The dimensional modeling is done and the target tables are partitioned and written to parquet files.

The project follows the follow steps:
* Step 1: Read the given data frames
* Step 2: Exploratory analysis on missing values and aggregation
* Step 3: Defining the Data Dimensional Model
* Step 4: Run ETL to Model the Data

In [1]:
# Do all imports and installs here
import pandas as pd

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [None]:
# Read in the data here
fname='../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")



In [3]:
df.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [47]:
df['i94yr'].drop_duplicates()

0    2016.0
Name: i94yr, dtype: float64

In [5]:
	
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
#from pyspark.sql import 
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [27]:
#write to parquet
#df_spark.write.parquet("sas_data")
#df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [12]:
# Performing cleaning tasks here
df_img=df_spark[['cicid','arrdate','i94addr','visatype']]

In [7]:
#check for nulls 
df_img.where(col("i94addr").isNull()).count() #152592 null values

#df_img.where(col("cicid").isNull()).count()  # 0

#df_img.where(col("visatype").isNull()).count() 

152592

In [8]:
#dropping rows where i94addr is null because we want to do analysis by region so where region is not defined we don't want results there
df_imgnew=df_img.na.drop()

In [9]:
df_imgnew.where(col('i94addr').isNull()).count()

0

In [10]:
#converting epoch timestamp to date 
#df_spark.withColumn('arrdate',F.date_format(df_img.epoch.cast(dataType=t)))
df_imgnew=df_imgnew.withColumn('arrdate',from_unixtime(df_imgnew.arrdate/1000,"yyyy-MM-dd"))

In [122]:
#cleaned dataframe with arrival date in readable format and no null values
df_imgnew.show()

+-----+----------+-------+--------+
|cicid|   arrdate|i94addr|visatype|
+-----+----------+-------+--------+
|  7.0|1970-01-01|     AL|      F1|
| 15.0|1970-01-01|     MI|      B2|
| 16.0|1970-01-01|     MA|      B2|
| 17.0|1970-01-01|     MA|      B2|
| 18.0|1970-01-01|     MI|      B1|
| 19.0|1970-01-01|     NJ|      B2|
| 20.0|1970-01-01|     NJ|      B2|
| 21.0|1970-01-01|     NY|      B2|
| 22.0|1970-01-01|     NY|      B1|
| 23.0|1970-01-01|     NY|      B2|
| 24.0|1970-01-01|     MO|      B2|
| 27.0|1970-01-01|     MA|      B1|
| 28.0|1970-01-01|     MA|      B1|
| 29.0|1970-01-01|     MA|      B2|
| 30.0|1970-01-01|     NJ|      B2|
| 31.0|1970-01-01|     NY|      B2|
| 33.0|1970-01-01|     TX|      B2|
| 34.0|1970-01-01|     CT|      B2|
| 35.0|1970-01-01|     CT|      B2|
| 36.0|1970-01-01|     NJ|      B2|
+-----+----------+-------+--------+
only showing top 20 rows



In [23]:
df_cities=pd.read_csv('us-cities-demographics.csv',delimiter=';')
#df_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [77]:
#df_cities[df_cities['State']=='Massachusetts']

In [14]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)


In [28]:
#df_temp['dt'].drop_duplicates()
df_tempus=df_temp[ (df_temp['Country']=='United States')]
df_tempusday=df_tempus[(df_tempus['dt']=='1970-01-01')]

In [31]:
#Creating inner join with date field to get average temp on that day
temp_dp=pd.merge(left=df_cities,right=df_tempusday,left_on='City',right_on='City',how='inner')

In [55]:
#cleaning redundant columns
tempdf=temp_dp[['City','State Code','dt','AverageTemperature']]

In [48]:
#taking avg temp by city, state and date
avgtempdf=tempdf.groupby(['City','State Code','dt']).mean()

In [60]:
#reading it in spark
temp=spark.createDataFrame(tempdf)

In [63]:
#aggregating spark dataframe
avgtemp=temp.groupby(['City','State Code','dt']).mean()

In [70]:
avgtemp=avgtemp.withColumnRenamed("avg(AverageTemperature)","Avg Temperature")

In [71]:
#checking for null values:
avgtemp.where(col('Avg Temperature').isNull()).count()

0

In [96]:
avgtemp.printSchema()

root
 |-- City: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- Avg Temperature: double (nullable = true)
 |-- Tempid: long (nullable = false)



In [88]:
avgtemp=avgtemp.withColumn('Tempid',monotonically_increasing_id())

In [154]:
avgtemp=avgtemp.withColumnRenamed('State Code','State')

In [171]:
avgtemp=avgtemp.withColumnRenamed('Avg Temperature','AvgTemp')

In [172]:
avgtemp

DataFrame[City: string, State: string, dt: string, AvgTemp: double, Tempid: bigint]

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [75]:
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, monotonically_increasing_id

In [173]:
#Dimension Tables - Time, Region, Immigration


time= avgtemp.select(
                    'dt',
                    dayofmonth('dt').alias('day'),
                    weekofyear('dt').alias('week'),
                    month('dt').alias('month'),
                    year('dt').alias('year'),
                    date_format('dt','E').alias('weekday'))


Region = avgtemp.select(
                    'Tempid',
                    'City',
                    'State')

Immigration = df_imgnew.select(
                    'cicid',
                    'visatype',
                    'i94addr')



In [176]:
time=time.withColumn('timeid',monotonically_increasing_id())

In [177]:
Region=Region.withColumnRenamed('State Code','State')

In [178]:
# Creating views for data model
df_imgnew.createOrReplaceTempView('Immigration')
Region.createOrReplaceTempView('Region')
avgtemp.createOrReplaceTempView('AverageTemp')
time.createOrReplaceTempView('Time')


In [179]:
F_Avgtemp=spark.sql("""
SELECT i.cicid as Img_id,
       a.Tempid as Temp_id ,
       t.timeid as time_id,
       i.visatype as VisaType,
       a.State as State,
       a.City as City,
       a.AvgTemp as AvgTemp
from Immigration i 
join Time t 
on i.arrdate=t.dt 
join Averagetemp a
on i.i94addr=a.State
       

"""
)

In [None]:
F_Avgtemp.write.mode("append").partitionBy("State","City").parquet("FactTemp")

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

def run_data_checks(table):
    if table.count() <= 0 :
        print("Data Quality check failed for {}".format(table))
    else:
        print("Data Quality check passed for {}".format(table))
    return None

run_data_checks(F_AvgTemp) #passed

#### 4.3 Data dictionary 

##### Dimension Tables :

##### Immigration  
Cicid : Immigration ID

Visatype : Type of Visa

##### Time
Timeid : Uniqueid for time

Day 

Week

Month

Weekday

##### Region
State : State in US4

City  : Cities in US

##### Fact Table

##### F_AvgTemp
Cicid : Immigration id

Timeid : Time id

State : State

City : City

Visatype : Type of Visa

Avg Temp : Average temperature of city

#### Conclusion and scope

In this we we were able to model the data based on immigration data along with time and location dimensional data. So at the end we were able to get fact table suggesting the average temperature of that city during which the immigration happened. 

##### Further Scope :

We could use EMR instance to analyze the data because of its huge volume.
THe data could be loaded into AWS Redshift in the form of tables.
Additional information on airport codes could be merged to get information about the flights.