# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import functions as func
from pyspark.sql import SparkSession

### Step 1: Scope the Project and Gather Data

#### Scope 
In this project, we will gather two datasets(immigration and temperature), and check the correlation between immigration and temperature.

#### Describe and Gather Data 
Immigration dataset comes from the US National Tourism and Trade Office, temperature dataset comes from Kaggle.
Below is the detail about the datasets

#### Immigration data
* i94yr: 4 numeric digit code of year
* i94mon: numeric month
* i94cit: 3 numeric digit code of origin country
* i94port: 3 charactor code of destination city
* arrdate: date of arrival to USA
* depdate: date of departure from USA
* i94visa: purpose of immigration (1=Buisness, 2=Pleasure, 3=Student)

In [3]:
# we need to build the spark session
spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0,saurfang:spark-sas7bdat:2.0.0-s_2.11")\
        .enableHiveSupport().getOrCreate()

In [4]:
df_immig = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [5]:
# Immigration data has 3096313 records
df_immig.count()

3096313

In [6]:
df_immig.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [7]:
# Displays first 5 records
df_immig.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

#### Temperature data
* AverageTemperature: average temperature
* AverageTemperatureUncertainty: average temperature uncertainty
* City: city name
* Country: country name
* Latitude: latitude
* Longitude: longitude

In [8]:
df_temp = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [9]:
# Temperature data has 8599212 records
df_temp.count()

8599212

In [10]:
df_temp.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [11]:
# Displays first 5 records
df_temp.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



### Step 2: Explore and Assess the Data
#### Explore the Data 
We need to re-organize the data in some points
1. Convert the sas file into dictionary form
2. Make shared column between two datasets(i94port)
3. Extract name of countries using i94cit column

In [12]:
# Read the sas file which contains informations about city, country etc
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()


# This function convert sas file into dictionary form
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

In [13]:
i94prtl = code_mapper(f_content, "i94prtl") # city name dictionaty
# i94prtl # Check what it looks like

In [14]:
i94cntyl = code_mapper(f_content, "i94cntyl") # country name dictionary
# i94cntyl # Check what it looks like

In [15]:
'''
Dictionaries above don't look great as well.

Values of city name dictionary contain informations about state which are not needed

Keys of country name are string type but they are numbers and values are consisted of CAPITAL LETTERS
'''

code_dict = {}
for key, value in i94prtl.items():
    fields = value.split(', ')
    code_dict[fields[0]] = key
    
country_dict = {}
for key, value in i94cntyl.items():
    lower = value[1:].lower()
    country = value[0] + lower
    country_dict[int(key)] = country

In [16]:
# we need functions for looking up city's name and country's name using codes
def lookupCode(city):
    try:
        return code_dict[city]
    except KeyError:
        return None
    
def lookupCountry(i94cit):
    try:
        return country_dict[i94cit]
    except KeyError:
        return None
    
lookupCodeUDF = func.udf(lookupCode)
lookupCountryUDF =func.udf(lookupCountry)

In [17]:
# Add i94port column to temperature dataset
df_temp = df_temp.withColumn("i94port", lookupCodeUDF(func.upper(func.col("City"))))
df_temp.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+-------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|i94port|
+----------+------------------+-----------------------------+-----+-------+--------+---------+-------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|   null|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|   null|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|   null|
+----------+------------------+-----------------------------+-----+-------+--------+---------+-------+
only showing top 5 rows



In [18]:
# Add country's name column to immigration dataset
df_immig = df_immig.withColumn('origin_country', lookupCountryUDF(func.col('i94cit')))
df_immig.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|origin_country|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+--------------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|       Ecuador|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0| 

In [19]:
'''
We need only one temperature information of each city

But it's not good to pick randomly

So we need to get average value of each city during the entire period
'''
average = df_temp.groupBy('City').agg(func.round(func.avg("AverageTemperature"), 2).alias("average_temp"))
average.show(5)

+---------+------------+
|     City|average_temp|
+---------+------------+
|  Antwerp|        9.88|
| Araruama|       23.79|
|Bangalore|       24.86|
|    Benxi|        7.21|
|Cajamarca|       16.89|
+---------+------------+
only showing top 5 rows



In [20]:
# We need to join the average table with the origianl one using city column
df_temp = df_temp.alias('table1').join(average.alias('table2'), (func.col("table1.City") == func.col("table2.City")))
df_temp.show(5)

+----------+------------------+-----------------------------+-------+-------+--------+---------+-------+-------+------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|   City|Country|Latitude|Longitude|i94port|   City|average_temp|
+----------+------------------+-----------------------------+-------+-------+--------+---------+-------+-------+------------+
|1743-11-01|              7.52|           1.6569999999999998|Antwerp|Belgium|  50.63N|    3.80E|   null|Antwerp|        9.88|
|1743-12-01|              null|                         null|Antwerp|Belgium|  50.63N|    3.80E|   null|Antwerp|        9.88|
|1744-01-01|              null|                         null|Antwerp|Belgium|  50.63N|    3.80E|   null|Antwerp|        9.88|
|1744-02-01|              null|                         null|Antwerp|Belgium|  50.63N|    3.80E|   null|Antwerp|        9.88|
|1744-03-01|              null|                         null|Antwerp|Belgium|  50.63N|    3.80E|   null|Antwerp|      

#### Cleaning Steps
Check empty, duplicated or useless datas and remove them

In [21]:
# We need to filter out records which is not about USA in temperature table, because immigration table only contains about USA 
df_temp = df_temp.filter(df_temp.Country == 'United States')

# We need to filter out the records whose i94port is empty, because we have to join two tables with this column
df_temp = df_temp.filter(df_temp.i94port != 'null')
df_immig = df_immig.filter(df_immig.i94port != 'null')

# We need to filter out the records whose i94port is 'XXX'
df_immig = df_immig.filter(df_immig.i94port != 'XXX')

# We need to remove duplicates
df_temp = df_temp.dropDuplicates(['City'])
df_immig = df_immig.dropDuplicates(['cicid'])

In [22]:
# Final immigration table
df_immig.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|origin_country|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+--------------+
|299.0|2016.0|   4.0| 103.0| 103.0|    NYC|20545.0|    1.0|     NY|20550.0|  54.0|    2.0|  1.0|20160401|    null| null|      O|      O|   null|      M| 1962.0|06292016|  null|  null|     OS|5.5425872433E10|00087|      WT|       Austria|
|305.0|2016.0|   4.0| 103.0| 103.0|    NYC|20545

In [23]:
# Final temperature table
df_temp.show(5)

+----------+------------------+-----------------------------+----------+-------------+--------+---------+-------+----------+------------+
|        dt|AverageTemperature|AverageTemperatureUncertainty|      City|      Country|Latitude|Longitude|i94port|      City|average_temp|
+----------+------------------+-----------------------------+----------+-------------+--------+---------+-------+----------+------------+
|1743-11-01|            13.918|                        2.234|Charleston|United States|  32.95N|   79.47W|    CHS|Charleston|        18.7|
|1835-01-01|            11.544|                        2.167|   Phoenix|United States|  32.95N|  112.02W|    PHO|   Phoenix|       21.05|
|1743-11-01|            15.164|                        2.315|  Savannah|United States|  31.35N|   81.05W|    SAV|  Savannah|       19.41|
|1758-03-01|             2.512|                        3.833|     Omaha|United States|  40.99N|   95.86W|    OMA|     Omaha|       10.05|
|1828-01-01|           -17.738|   

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
<center>
  <img
    src="erd.png"
    width="700"
    height="1400"
  />
</center>

#### 3.2 Mapping Out Data Pipelines
1. Gather data from sources
2. Clean the data
3. Select columns we need from temperature datasets to create dimesion temperature table
4. Select columns we need from immigration datasets to create dimesion demographic table
5. Join two datasets using i94port, select columns and create fact immigration table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [24]:
# Create template view
df_immig.createOrReplaceTempView('immigration')
df_temp.createOrReplaceTempView('temperature')

# Create dimension demographic table
dim_demographic = df_immig.select('cicid', 'i94port', 'i94yr', 'i94mon', 'i94cit', 'arrdate', 'depdate', 'i94visa', 'origin_country')


# Create dimension temperature table
dim_temperature = df_temp.select('i94port', 'table1.City', 'Country', 'Latitude', 'Longitude', 'average_temp')


# Create fact immigration table
fact_immigration = spark.sql('''
                             SELECT row_number() over (order by T.i94port) as immig_id,
                             T.i94port, I.cicid, T.average_temp, I.i94yr, I.i94mon, I.i94cit, 
                             I.arrdate, I.depdate, I.i94visa
                             FROM immigration I JOIN temperature T
                             ON I.i94port = T.i94port
                             ''')

In [25]:
dim_demographic.show(5)

+-----+-------+------+------+------+-------+-------+-------+--------------+
|cicid|i94port| i94yr|i94mon|i94cit|arrdate|depdate|i94visa|origin_country|
+-----+-------+------+------+------+-------+-------+-------+--------------+
|299.0|    NYC|2016.0|   4.0| 103.0|20545.0|20550.0|    2.0|       Austria|
|305.0|    NYC|2016.0|   4.0| 103.0|20545.0|20555.0|    2.0|       Austria|
|496.0|    CHI|2016.0|   4.0| 103.0|20545.0|20548.0|    1.0|       Austria|
|558.0|    SFR|2016.0|   4.0| 103.0|20545.0|20547.0|    1.0|       Austria|
|596.0|    NAS|2016.0|   4.0| 103.0|20545.0|20547.0|    2.0|       Austria|
+-----+-------+------+------+------+-------+-------+-------+--------------+
only showing top 5 rows



In [26]:
dim_temperature.show(5)

+-------+----------+-------------+--------+---------+------------+
|i94port|      City|      Country|Latitude|Longitude|average_temp|
+-------+----------+-------------+--------+---------+------------+
|    CHS|Charleston|United States|  32.95N|   79.47W|        18.7|
|    PHO|   Phoenix|United States|  32.95N|  112.02W|       21.05|
|    SAV|  Savannah|United States|  31.35N|   81.05W|       19.41|
|    OMA|     Omaha|United States|  40.99N|   95.86W|       10.05|
|    ANC| Anchorage|United States|  61.88N|  151.13W|        -2.3|
+-------+----------+-------------+--------+---------+------------+
only showing top 5 rows



In [27]:
fact_immigration.show(5)

+--------+-------+---------+------------+------+------+------+-------+-------+-------+
|immig_id|i94port|    cicid|average_temp| i94yr|i94mon|i94cit|arrdate|depdate|i94visa|
+--------+-------+---------+------------+------+------+------+-------+-------+-------+
|       1|    ABQ|5289835.0|       11.14|2016.0|   4.0| 266.0|20572.0|   null|    3.0|
|       2|    ABQ|5357011.0|       11.14|2016.0|   4.0| 582.0|20572.0|20642.0|    3.0|
|       3|    ABQ|5289836.0|       11.14|2016.0|   4.0| 266.0|20572.0|20590.0|    3.0|
|       4|    ANC|4310664.0|        -2.3|2016.0|   4.0| 209.0|20567.0|20588.0|    1.0|
|       5|    ANC|3504250.0|        -2.3|2016.0|   4.0| 135.0|20563.0|20640.0|    2.0|
+--------+-------+---------+------------+------+------+------+-------+-------+-------+
only showing top 5 rows



In [29]:
# Save the tables as parquet file
dim_demographic.write.mode('overwrite').partitionBy("i94port").parquet("/sample/dim_demographic.parquet")
dim_temperature.write.mode('overwrite').parquet("/sample/dim_temperature.parquet")
fact_immigration.write.mode('overwrite').partitionBy("i94port").parquet("/sample/fact_immigration.parquet")

#### 4.2 Data Quality Checks
Has row check

In [None]:
def HasrowCheck(df, table):
    print(f'{table} table is under has row check')
    result = df.count()
    if result == 0:
        raise ValueError(f'{table} table has no record')
    print(f'{table} table has {result} records')

In [None]:
HasrowCheck(dim_demographic, 'dim_demographic')
HasrowCheck(dim_temperature, 'dim_temperature')
HasrowCheck(fact_immigration, 'fact_immigration')

Null check

In [None]:
dim_demographic.createOrReplaceTempView("dim_demographic")
dim_temperature.createOrReplaceTempView("dim_temperature")
fact_immigration.createOrReplaceTempView("fact_immigration")

In [None]:
def nullCheck(table_dict):
    
    for table, column in table_dict.items():
        print(f'{table} table is under null check')
        records = spark.sql(f'SELECT * FROM {table} WHERE {column} IS NULL')
        if records.count() > 0:
            raise ValueError(f'Null value found in {table}.{column}')
        print(f'{table} table passed')

In [None]:
table_dict = {'dim_demographic': 'cicid',
              'dim_temperature': 'i94port',
              'fact_immigration': 'immig_id'}

nullCheck(table_dict)

Duplicates check

In [None]:
def duplicatesCheck(df, table, column):
    print(f'{table} table is under duplicates check')
    result = df.count()
    if result > df.dropDuplicates([column]).count():
        raise ValueError(f'{table} table has duplicates')
    print(f'{table} table passed')

In [None]:
duplicatesCheck(dim_demographic, 'dim_demographic', 'cicid')
duplicatesCheck(dim_temperature, 'dim_temperature', 'i94port')
duplicatesCheck(fact_immigration, 'fact_immigration', 'immig_id')

#### 4.3 Examples of query

In [None]:
# If we want to check the most visited cities, this query gives the answer
most_visited_city = spark.sql('''
                              SELECT COUNT(*) cnt, T.city
                              FROM fact_immigration I JOIN dim_temperature T
                              ON I.i94port = T.i94port
                              GROUP BY T.City
                              ORDER BY cnt DESC
                              LIMIT 10
                              ''')

most_visited_city.show()

In [None]:
# If we want to check the countries the most people come from, this query gives the answer
home_countries = spark.sql('''
                           SELECT COUNT(*) cnt, D.origin_country
                           FROM fact_immigration I JOIN dim_demographic D
                           ON I.cicid = D.cicid
                           WHERE D.origin_country IS NOT NULL
                           GROUP BY D.origin_country
                           ORDER BY cnt DESC
                           LIMIT 10
                           ''')

home_countries.show()

In [None]:
# Our purpose of this data model is to check the correlation between temperature and the number of visits
corr_table = spark.sql('''
                       SELECT COUNT(*) cnt, average_temp
                       FROM fact_immigration
                       GROUP BY average_temp
                       ''')

corr_table.toPandas().corr()
# It turned out that these two factors have correlation coefficient of 0.1 which is weak correlation

#### 4.4 Data dictionary 

#### dim_demographic
* cicid: id from sas file
* i94port: 3 charactor code of destination city
* i94yr: numeric year
* i94mon: numeric month
* i94cit: 3 numeric digit code of origin country
* arrdate: date of arrival to USA
* depdate: date of departure from USA
* i94visa: purpose of immigration (1=Buisness, 2=Pleasure, 3=Student)
* origin_country: origin country based on i94cit

#### dim_temperature
* i94port: 3 charactor code of destination city
* City: city name
* Country: country name
* Latitude: latitude
* Longitude: longitude
* average_temp: average temperature of each city

#### fact_immigration
* immig_id: automatically created id
* i94port: 3 charactor code of destination city
* cicid: id from sas file
* average_temp:
* i94yr: numeric year
* i94mon: numeric month
* i94cit: 3 numeric digit code of origin country
* arrdate: date of arrival to USA
* depdate: date of departure from USA
* i94visa: purpose of immigration (1=Buisness, 2=Pleasure, 3=Student) 

#### Step 5: Complete Project Write Up
* **Clearly state the rationale for the choice of tools and technologies for the project.**</br>
The tool we used in this project is apache spark. Spark is easy to use and fast when to handle a huge amount of data.
It also can perform advanced analytics fast.
* **Propose how often the data should be updated and why.**</br>
Taking a look for dataset, month is the smallest time unit. So data should be updated monthly.
* **Write a description of how you would approach the problem differently under the following scenarios:**
 * **The data was increased by 100x.**</br>
 If the amount of data is much bigger than now it is, instead of local machine, better to use AWS cloud and redshift cluster which is optimized for running big data
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day.**</br>
 It had be better use apache airflow that we can manage the schedule.
 * **The database needed to be accessed by 100+ people.**</br>
 Datesets should be stored in S3bucket as parquet files, so users can access externally