
# Data Engineering Capstone Project

#### Project Summary

In this project, we are going to build a complete ETL pipeline to process information from several sources. The goal is to merge, the information from the sources, process it, and create a data model that helps later to analyze this data. For that purpose, we are going to work with Spark, and the data is:

- Immigration: specific information about every people that entry to EEUU
- Cities: demographic information about cities and states.
- Temperature: global temperature around the world, has been measured monthly.

With these datasets, we are going to build the data model and ETL. The final purpose is to help analytical people understand why those travelers come in that moment of the year, taking into account the temperature from the origin country, and also the temperature at that moment in the United States. Besides, demographic information can be added to this analysis where we can take into account the age of the traveler, the median age of state that is visited, and also the total population or percentage male/female.

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Check data quality
* Step 6: Complete Project Write Up

In [3]:
# imported libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import glob
import re
import math
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, asc, monotonically_increasing_id, to_timestamp, lit
from pyspark.sql.types import StringType, IntegerType, IntegerType

### Step 1: Scope the Project and Gather Data

Every dataset is going to be explained in the following points:

#### 1.1 Read immigration data using Spark. It reads all data from "sas_data" folder

This data set is name: I94 Immigration. This data comes from the US National Tourism and Trade Office and is basically records from international visitors to US. These records store information such as arrival and departure date, visa, mode, and personal information of the passenger as it could be gender, country and age. [Link to source](https://travel.trade.gov/research/reports/i94/historical/2016.html)


In this first step we are going to load the raw data from the US National Tourism and Trade Office to a dataframe, and later on we will processed in our data model.

In [23]:

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

i94_fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
i94_files = glob.glob("../../data/18-83510-I94-Data-2016/*.sas7bdat")
#Only april month. This small dataset contais more than 3Million records
immigrationDF = spark.read.format("com.github.saurfang.sas.spark").load(i94_fname)


# To load the full dataset uncomment the following line
#immigrationDF = spark.read.parquet("sas_data")



In [24]:
immigrationDF.show(3)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| n

In [11]:
#immigrationDF = immigrationDF.withColumnRenamed('_c0','immigration_id')
immigrationDF.printSchema()


root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

#### 1.2 Read global land temperature csv. 
This file contains the average temperature of each month and by city. [Link to source](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)

In [13]:
# read temperature data
temperature_fname = "../../data2/GlobalLandTemperaturesByCity.csv"
fnameTemperature = 'GlobalLandTemperaturesByCity.csv'
temperatureDF = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature_fname)

In [14]:
temperatureDF.show(3)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 3 rows



#### 1.3 Read cities information csv. 
This file contains demographic information about US cities. This demographic information has some fields as the race, gender majority, median age, etc. [Link to source](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

In [109]:
# read us-cities data
usCitiesPath = 'us-cities-demographics.csv'
citiesDF = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(usCitiesPath)

#### 1.4 Read airport information csv. 
This file contais airport details as airport code IATA, locations, and other relevant information of each airport. [Link to source](http://ourairports.com/data/airports.csv)


In [16]:
# read airport-codes data
airportPath = 'airport-codes_csv.csv'
airportDF = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(airportPath)

### Step 2: Explore and Assess the Data
#### Explore the Data 
1. Explore data
2. Filter null values of important fields for the future analysis
2. Select the proper columns for the data model
3. Process and transforming data

#### 2.1 Exploring and parsing Immigration data set

In [25]:
pd.set_option("display.max.columns", None)
immigrationDF.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

For our data model is going to be important the departure date, that match with the column "depdate". First of all we are going to filter by no-null values. 

In [26]:
immigrationDF = immigrationDF.dropna(subset=['depdate'])
immigrationDF = immigrationDF.dropna(subset=['arrdate'])

Now we are going to procced to filter the data. We do not need several columns. Firstable we are going to remove some columns not needed and afterwards rename the rest of the columns for a better understanding:

In [29]:
immigrationDF.show(3)

+--------------+-------+------------+------------+----+----------+--------------+----+----+------+-------+-------+-------------+
|immigration_id|city_id|country_from|arrival_date|mode|state_code|departure_date|visa| age|gender|biryear|airline|flight_number|
+--------------+-------+------------+------------+----+----------+--------------+----+----+------+-------+-------+-------------+
|          15.0|    WAS|       101.0|     20545.0| 1.0|        MI|       20691.0| 2.0|55.0|     M| 1961.0|     OS|           93|
|          16.0|    NYC|       101.0|     20545.0| 1.0|        MA|       20567.0| 2.0|28.0|  null| 1988.0|     AA|        00199|
|          17.0|    NYC|       101.0|     20545.0| 1.0|        MA|       20567.0| 2.0| 4.0|  null| 2012.0|     AA|        00199|
+--------------+-------+------------+------------+----+----------+--------------+----+----+------+-------+-------+-------------+
only showing top 3 rows



In [28]:
immigrationDF = immigrationDF.select(col("cicid").alias("immigration_id"),
                                    col("i94port").alias("city_id"),
                                    col("i94cit").alias("country_from"),
                                    col("arrdate").alias("arrival_date"),
                                    col("i94mode").alias("mode"),
                                    col("i94addr").alias("state_code"),
                                    col("depdate").alias("departure_date"),
                                    col("i94visa").alias("visa"),
                                    col("i94bir").alias("age"),
                                    col("gender").alias("gender"),
                                    col("biryear").alias("biryear"),
                                    col("airline").alias("airline"),
                                    col("fltno").alias("flight_number"))

print((immigrationDF.count(), len(immigrationDF.columns)))



(2953856, 13)


After that we procced to format date values in columns:

- Arrival date
- Departure date

In [30]:
#parse date
@udf(StringType())
def sas_date_parser(sas_date):
    """
    parse sas date to datetime format

    Args:
        sas_date ([type]): [description]

    Returns:
        datetime format
    """
    if sas_date:
        return (datetime(1960, 1, 1).date() + timedelta(sas_date)).isoformat()
    else:
        return None


In [31]:
immigrationDF = immigrationDF.withColumn("arrival_date", sas_date_parser(immigrationDF.arrival_date))
immigrationDF = immigrationDF.withColumn("departure_date", sas_date_parser(immigrationDF.departure_date))

In [33]:
immigrationDF.limit(5).toPandas()


Unnamed: 0,immigration_id,city_id,country_from,arrival_date,mode,state_code,departure_date,visa,age,gender,biryear,airline,flight_number
0,15.0,WAS,101.0,2016-04-01,1.0,MI,2016-08-25,2.0,55.0,M,1961.0,OS,93
1,16.0,NYC,101.0,2016-04-01,1.0,MA,2016-04-23,2.0,28.0,,1988.0,AA,199
2,17.0,NYC,101.0,2016-04-01,1.0,MA,2016-04-23,2.0,4.0,,2012.0,AA,199
3,18.0,NYC,101.0,2016-04-01,1.0,MI,2016-04-11,1.0,57.0,,1959.0,AZ,602
4,19.0,NYC,101.0,2016-04-01,1.0,NJ,2016-04-14,2.0,63.0,,1953.0,AZ,602


Following step would be use the SAS description file to change the numbers of country code, visa a mode, by their categories

1. Read SAS description file content

In [34]:
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()


2. Format the column to be able to parse

In [35]:
@udf(IntegerType())
def parse_country(x):
    return math.trunc(x)

In [36]:

immigrationDF = immigrationDF.withColumn('country_from', parse_country(immigrationDF.country_from))


In [37]:
@udf(StringType())
def parse_sas_country_codes(x):
    """
    parse the numeric codes of immigration data source with the SAS description file.
    The string return is the name of the country, if there is not information "no info" is returned

    Args:
        x ([type]): [description]

    Returns:
        [type]: [description]
    """
    x=str(x)
    result = [(i) for i in lines[9:298] if i.__contains__(x)]

    # there are index that have not description on SAS file
    if len(result) != 1: 
        return "No info"
    
    result = re.search(' \'(.*?)\'', result[0])
    return result.group(1)

3. Use the file to parse the codes

In [38]:
# parse country code
immigrationDF = immigrationDF.withColumn('country_from', parse_sas_country_codes(immigrationDF.country_from))


3. Aswell for mode and visa. Implement funtion to parser the value

In [39]:
@udf(StringType())
def parse_sas_mode(data):
    """
    1 = 'Air'
	2 = 'Sea'
	3 = 'Land'
	9 = 'Not reported'


    Args:
        data ([type]): [description]
    """
    mode = {1:'Air', 2: 'Sea', 3: 'Land', 9 : 'not reported'}
    
    if(data in mode):
        return mode[data]
        
    return 'not supported'

In [40]:
immigrationDF = immigrationDF.withColumn('mode', parse_sas_mode(immigrationDF.mode))

In [41]:
@udf(StringType())
def parse_sas_visa(data):
    """
    1: business
    2: pleasure
    3: student

    Args:
        data ([type]): [description]
    """

    visa = {1 : 'Business', 2:'Pleasure', 3:'Student'}

    if(data in visa):
        return visa[data]
        
    return 'not supported'

In [42]:
immigrationDF = immigrationDF.withColumn('visa', parse_sas_visa(immigrationDF.visa))

In [43]:
immigrationDF.limit(5).toPandas()

Unnamed: 0,immigration_id,city_id,country_from,arrival_date,mode,state_code,departure_date,visa,age,gender,biryear,airline,flight_number
0,15.0,WAS,ALBANIA,2016-04-01,Air,MI,2016-08-25,Pleasure,55.0,M,1961.0,OS,93
1,16.0,NYC,ALBANIA,2016-04-01,Air,MA,2016-04-23,Pleasure,28.0,,1988.0,AA,199
2,17.0,NYC,ALBANIA,2016-04-01,Air,MA,2016-04-23,Pleasure,4.0,,2012.0,AA,199
3,18.0,NYC,ALBANIA,2016-04-01,Air,MI,2016-04-11,Business,57.0,,1959.0,AZ,602
4,19.0,NYC,ALBANIA,2016-04-01,Air,NJ,2016-04-14,Pleasure,63.0,,1953.0,AZ,602


#### 2.2 Exploring and parsing Temperature data set

#### Cleaning Steps
Document steps necessary to clean the data

In [44]:
number_rows_before_clean = (temperatureDF.count())

There are certain data that we have to ensure we have due to its importance. In that case would be columns: AverageTemperature, dt, City. With a null value among these values, the row is not going to be usefull. So we ensure removing those rows.

In [45]:
temperatureDF = temperatureDF.dropna(subset=['AverageTemperature', 'dt', 'City', 'Country'],how='any')
number_rows_after_clean = (temperatureDF.count())
print('Rows removed',number_rows_before_clean-number_rows_after_clean)

Rows removed 364130


Removed columns not needed: Latitude, Longitude, Country, AverageTemperatureUncertainty

In [46]:
temperatureDF = temperatureDF.drop('Latitude', 'Longitude', 'AverageTemperatureUncertainty')

Upper case country to match this value with immigradition country data

In [47]:
@udf(StringType())
def uppercase(x):
    return str(x).upper()

In [48]:
temperatureDF = temperatureDF.withColumn("Country", uppercase(temperatureDF.Country))


We are going to group all data from each country by date. With this change we are going to remove city field, and make an average temperature of all cities of the country for each date. The reason to do that is that in immigration data we do not have city_from information, but we have country_from, so later in analytics we can comapre temperature from original country and United Stated.

In [49]:
temperatureDF = temperatureDF.drop("City")

In [50]:
temperatureDF = temperatureDF.withColumn("AverageTemperature",col("AverageTemperature").cast("int"))
# group by country and date
temperatureDF = temperatureDF.groupby(['Country','dt']).mean('AverageTemperature')
temperatureDF = temperatureDF.orderBy(asc('Country'))

In [52]:
temperatureDF.limit(5).toPandas()

Unnamed: 0,Country,dt,avg(AverageTemperature)
0,AFGHANISTAN,1846-11-01,6.142857
1,AFGHANISTAN,1910-12-01,-0.5
2,AFGHANISTAN,1852-06-01,23.375
3,AFGHANISTAN,1863-03-01,7.5
4,AFGHANISTAN,1878-04-01,12.625


#### 2.3 Exploring and parsing Cities dataset

1. Firstable check null values to clean up.

In [181]:
citiesDF = spark.read.format("csv").option("delimiter", ";").option("header", "true").load(usCitiesPath)

In [182]:
citiesDF.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



Select columns to remove

In [183]:
citiesDF = citiesDF.drop('Number of Veterans', 'Foreign-born', 'Average Household Size', 'Race', 'Count')


In [184]:
citiesDF = citiesDF.select(col('City').alias('City'),
                            col('State').alias('State'),
                            col('Median Age').alias('Median_age'),
                            col('Male Population').alias('Male_Population'),
                            col('Female Population').alias('Female_Population'),
                            col('Total Population').alias('Total_Population'),
                            col('State Code').alias('State_code'))

In [185]:
citiesDF.columns

['City',
 'State',
 'Median_age',
 'Male_Population',
 'Female_Population',
 'Total_Population',
 'State_code']

Add some statistics about the gender population

In [186]:
@udf(StringType())
def get_proportion(a,b):
    """
    calcule the percentage between argument a and b
    it is used to know the percentage of male / female on the total population

    Returns:
        [int]: percentage
    """
    a = int(a or 0)
    b = int(b or 0)
    if not math.isnan(a) and not math.isnan(b):
        return (a)/(b)*100
    else:
        return 0

In [187]:
# convert types tu numeric
citiesDF = citiesDF.withColumn("Median_age",col("Median_age").cast("int"))
citiesDF = citiesDF.withColumn("Male_Population",col("Male_Population").cast("int"))
citiesDF = citiesDF.withColumn("Female_Population",col("Female_Population").cast("int"))
citiesDF = citiesDF.withColumn("Total_Population",col("Total_Population").cast("int"))


In [188]:
citiesDF = citiesDF.withColumn("Male_percentage", get_proportion(citiesDF['Male_Population'].cast("int"),citiesDF['Total_Population'].cast("int")))
citiesDF = citiesDF.withColumn("Female_percentage", get_proportion(citiesDF['Female_Population'],citiesDF['Total_Population']))

Uppercase state values

In [189]:
citiesDF = citiesDF.withColumn('State',uppercase(citiesDF.State))

In [190]:
citiesDF.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median_age: integer (nullable = true)
 |-- Male_Population: integer (nullable = true)
 |-- Female_Population: integer (nullable = true)
 |-- Total_Population: integer (nullable = true)
 |-- State_code: string (nullable = true)
 |-- Male_percentage: string (nullable = true)
 |-- Female_percentage: string (nullable = true)



In immigration data we do not have information about the city of destiny, but we have the state code, so we are going to work we that value, remove city name, and do the average of all date per state code.

In [191]:
citiesDF = citiesDF.drop('City')
citiesDF = citiesDF.withColumn("Male_percentage",col("Male_percentage").cast("int"))
citiesDF = citiesDF.withColumn("Female_percentage",col("Female_percentage").cast("int"))
citiesDF = citiesDF.groupby(['State', 'State_Code']).mean('Median_age', 'Male_Population', 'Female_Population', 'Total_Population', 'Male_percentage', 'Female_percentage')

In [192]:
citiesDF.printSchema()

root
 |-- State: string (nullable = true)
 |-- State_Code: string (nullable = true)
 |-- avg(Median_age): double (nullable = true)
 |-- avg(Male_Population): double (nullable = true)
 |-- avg(Female_Population): double (nullable = true)
 |-- avg(Total_Population): double (nullable = true)
 |-- avg(Male_percentage): double (nullable = true)
 |-- avg(Female_percentage): double (nullable = true)



In [193]:
citiesDF.show(5)

+----------+----------+-----------------+--------------------+----------------------+---------------------+--------------------+----------------------+
|     State|State_Code|  avg(Median_age)|avg(Male_Population)|avg(Female_Population)|avg(Total_Population)|avg(Male_percentage)|avg(Female_percentage)|
+----------+----------+-----------------+--------------------+----------------------+---------------------+--------------------+----------------------+
|NEW MEXICO|        NM|             37.5|            102252.5|              107508.0|             209760.5|               48.25|                 50.75|
|  ILLINOIS|        IL|35.23076923076923|  120262.24175824175|    127148.63736263737|    247410.8791208791|   48.37362637362637|     50.62637362637363|
| MINNESOTA|        MN|35.05555555555556|   64422.27777777778|     66025.22222222222|             130447.5|  48.388888888888886|    50.611111111111114|
|CALIFORNIA|        CA|35.69970414201183|   90319.04142011834|     92290.94822485207|   

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model


The final data model is build by 3 differents data sources: immigration, temperature and cities data.
With all this data we can build the following data model:

![dataModel](img/dataModel-FACT-MODEL1.png)


This data model is made by 5 tables. This star schema is build from the fact table:
- Immigration

4 dimension tables:
- City
- Airline
- Personal
- Temperature


This data model breakdown most part of immigration source data, for a easy understanding. Also the rest of sources are filtered, clean and transformed based on the immigration data. All these make a star schema, where it would be very easily analyze personal information, temperature where the traveler comes, but also demographic information of the state that visit the tourist.



#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Load data from sources.
2. Analyze, and clean data from nulls values.
3. Transform some data types.
4. Remove not necessary data.
5. Create fact and dimesion tables.
6. Save tables in parquet for downstream query 
7. Check quality data.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

4.2 Create Personal dataframe

In [194]:
immigrationDF.columns

['immigration_id',
 'city_id',
 'country_from',
 'arrival_date',
 'mode',
 'state_code',
 'departure_date',
 'visa',
 'age',
 'gender',
 'biryear',
 'airline',
 'flight_number']

In [195]:
personal_table = immigrationDF.select(col('immigration_id'),col('gender'), col('age'), col('country_from'), col('biryear'))

4.3 Create City dataframe

In [196]:
citiesDF.columns

['State',
 'State_Code',
 'avg(Median_age)',
 'avg(Male_Population)',
 'avg(Female_Population)',
 'avg(Total_Population)',
 'avg(Male_percentage)',
 'avg(Female_percentage)']

In [197]:
cities_table = citiesDF.select(col('State'),col('avg(Median_age)').alias('Median_Age'), col('avg(Male_Population)').alias('Male_Population'), col('avg(Female_Population)').alias('Female_Population'),
       col('avg(Total_Population)').alias('Total_Population'), col('State_Code'), col('avg(Male_percentage)').alias('Male_percentage'), col('avg(Female_percentage)').alias('Female_percentage'))


4.4 Create Airline dataframe

In [198]:
#trip_immigration_id, airline_name, flight_number
airline_table = immigrationDF.select(col('immigration_id'), col('airline'), col('flight_number'))

4.5 Create Temperature dataframe

In [214]:
temperature_table = temperatureDF.select(col('dt'), col('avg(AverageTemperature)').alias('AverageTemperature'),  col('Country'))

Add a id for each row

In [216]:
temperature_table.columns

['dt', 'AverageTemperature', 'Country']

In [221]:
#temperature_table['temp_id'] = np.arange(temperature_table.shape[0]) 
temperatureDF = temperatureDF.withColumn("temp_id", monotonically_increasing_id())
temperature_table = temperature_table.withColumn("temp_id", monotonically_increasing_id())

In [222]:
temperature_table.columns

['dt', 'AverageTemperature', 'Country', 'temp_id']

4.6 Create Immigration dataframe

In [223]:
immigration_table = immigrationDF.select(col('immigration_id'), col('state_code'), col('arrival_date'), col('departure_date'), col('mode'), col('visa'))

Associate temp_id of temperature table, based on the country of turist, and the date of arrival to the country

In [224]:
immigration_table = immigration_table.withColumn('arrival_date', to_timestamp(immigration_table.arrival_date)) 
temperature_table = temperature_table.withColumn('dt', to_timestamp(temperature_table.dt)) 

In [225]:
def get_temp_id(immi_id, date):
    """
    return the temp id of the temperature id that match with the country of origin of the tourist and also the arrival date to United States

    Args:
        immi_id ([type]): [description]
        date ([type]): [description]

    Returns:
        [type]: [description]
    """
    country = personal_table[personal_table['immigration_id']==immi_id]['country_from']
    year = date.year
    month = date.month
    
    id = temperature_table['temp_id'][temperature_table['dt'].dt.year==year][temperature_table['dt'].dt.month==month][temperature_table['Country']==country]
    
    return str(id)

In [226]:
immigration_table = immigration_table.withColumn('tem_id', lit(get_temp_id(immigration_table['immigration_id'], immigration_table['arrival_date'])))


### Write to parquet files

In [207]:
immigration_table.write.mode("overwrite").parquet("immigration")


In [208]:
personal_table.write.mode("overwrite").parquet("personal")


In [227]:
temperature_table.write.mode("overwrite").parquet("temperature")

In [210]:
cities_table.write.mode("overwrite").parquet("city")

#### 4.2 Data Quality Checks


There is several data quality check, all of them are on a specific file call: Data_quality.ipynb.

1. Check every table has been stored: this is usefull to know firstable if there was any error during ETL, and later to check the schema match with the data model designed.
2. Check there is any table empty: If the table is empty, it is very likely that an error could happen running the pipeline.
3. Check there is not a immigration_id duplicated: Once that we are sure that the tables are well build and fill. We can check if there is any primary key duplicated, very important check.

Refer to "data_quality" file to run this test

#### 4.3 Data dictionary 
In the following tables are described the dimension and fact table. Also every field of the tables has a description explaining the meaning of it.

FACT TABLE: IMMIGRATION

| Data field | Description |
| --- | --- |
| immigration_id | Immigration identifier, it's the PRIMARY KEY for every entrance |
| state_code | State code where the person is getting in |
| arrival_date | Date of arrival of the trip |
| departure_date | Date of departure of the trip |
| mode | Mean the mode of entry to the country (air, sea, ground)|
| visa | Mean the visa type, it could by business, pleasure, student|
| temp_id | id related to temperature dimension table. It is realted to a row that show the average temperature of the country of origin |

DIMENSION TABLE: PERSONAL

| Data field | Description |
| --- | --- |
| immigration_id | identifier related to primary key of immigration table |
| gender | Gender of the person. Male/female. |
| age | Age at the moment of the entry |
| country_from | Country of origin of the traveler |
| biryear | Birth year of the traveler |


DIMENSION TABLE: TEMPERATURE

| Data field | Description |
| --- | --- |
| temp_id | Unique ID, for each date and country. |
| dt | Date time value of the moment of the temperature |
| AverageTemperature | Average temperature of all cities of the country for a certain month |
| Country | Country where the temperature has been measure |



DIMENSION TABLE: CITY

| Data field | Description |
| --- | --- |
| State | Name of the state to visit |
| Meadian Age | Median age of the state |
| Male Population | Number of men in the state |
| Male Percentage | Percentage of men |
| Female Percentage | Percentage of female |
| Female Population | Number of female in the state  |
| Total Population | Total population of the sate |
| State Code | State code |




DIMENSION TABLE: AIRLINE

| Data field | Description |
| --- | --- |
| immigration_id | Immigrant ID who takes the flight |
| airline | Airline of flight service |
| flight_number | Flight number of the airline |


#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

We have chosen top-notch technologies that work in the cloud. Specifically, we have used AWS, and several services of it, as it could be S3, and Amazon Redshift as a datawarehouse.
Also, Spark is heavily used, always thinking in massive amount of data, with Spark we could execute code in parallel due to how its works and it is designed for big data.

* Propose how often the data should be updated and why.

The data should be updated periodically, but not all the tables are necessary to do with the same frequency. The fact table, 'immigrantion', should be updated at least one per week if we want to use this data for analytics and dashboard. However, other data as demographic information in table 'city' is not needed to do every week, we can update this table every 5 years (by example). Temperature data should be updated once per month. 

* Write a description of how you would approach the problem differently under the following scenarios:

 * The data was increased by 100x.

 If the data became incredibly big enough to need another service, we can move to AWS EMR, which is a big data processing service designed for large dataset.

 * The data populates a dashboard that must be updated on a daily basis by 7am every day.

 The best way to approach this is by using Apache Airflow and creating a daily scheduled.

 * The database needed to be accessed by 100+ people.
 We can consider scale up the datawarehouse cloud service in order to being able to manage every request.
