# Student Arrivals Data Model
### Data Engineering Capstone Project

#### Project Summary
I choose to work with the Udacity provided project, hence I select the immigration and US cities demographics datasets as well as along with 2 additional ones: Airlines and Visa types. The objective is to build a dimensional model in Amazon Redshift in order to analyze the data of immigrants with student visas and fill it with an ETL process.

The project follows the following steps:
* Step 1: Project Scope and data gathering
* Step 2: Exploring, Assessing and saving the Data
* Step 3: Defining the Data Model
* Step 4: Running the ETL
* Step 5: Project Write Up

In [11]:
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import configparser
import os
from pyspark.sql.functions import isnan, when, count, col, expr, year, month, date_format
from pyspark.sql import SQLContext

### <span style="color:blue">Step 1: Project Scope and data gathering</span>

#### 1.1 Scope
For this project, two of the four datasets provided are used: *I94 Immigration Data* and *U.S. City Demographic Data*, also two extra datasets: *iataCodes.xlsx* and *visaType.csv*.

A subset of the immigration data is used - student visa records only(I94VISA = 3). The end solution is a dimensional model built in Redshift to analyze and make reports about student data. For instance, discovering insights about student US State preference, student nationalities that visit the country the most, student arrivals per month, etc.

The datasets are preprocessed using Spark and Pandas in the Udacity Workspace with a Jupyter notebook and then saved to S3 in parquet files, then with an ETL process written in python, the staging, dimension and fact tables are created in Redshift and filled with the data stored in S3.

#### 1.2 Data Sources description 

- **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office. [source](https://travel.trade.gov/research/reports/i94/historical/2016.html).
Contains international visitor arrival statistics by world regions and select countries, type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries), etc.
- **U.S. City Demographic Data**: This data comes from OpenSoft. [source](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).
Contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.
- **iataCodes.xlsx**: This data was gathered from a few sources:
[source1](https://www.iata.org/en/publications/directories/code-search/),
[source2](https://azcargo.cz/en/services/support/iata-airline-codes/)
[source3](https://en.wikipedia.org/wiki/List_of_airline_codes)
Contains the Airline codes and Airline names.
- **visaType.csv**: This data comes from the US National Tourism and Trade Office.
[source](https://www.trade.gov/i-94-arrivals-program)
Contains the Visa codes and Visa descriptions.  
- **I94_SAS_Labels_Descriptions.SAS**: This data was provided by Udacity.
Contains a short description for each column of the I94 Immigration Dataset. *State, Country, Entry mode and Port catalogs* are extracted from this file.

### Loading Catalogs

In [12]:
def get_catalog(file, idx):
    """Reads the content of I94_SAS_Labels_Descriptions.SAS file and returns a pandas data frame
       with the corresponding labels according to the "value"

       Input Arguments: file - file content, 
                        idx - value items, example: i94addrl,i94cntyl
    """
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    df = pd.DataFrame.from_dict(dic, orient="index").reset_index()
    df.columns = ["code","desc"]
    return df

# opens the file
with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

#### Loading Country Catalog

In [13]:
df_country = get_catalog(f_content, "i94cntyl")

df_country.loc[len(df_country.index)] = ['99', 'All Other Codes']
df_country['code'] = df_country['code'].astype('int16')

print(df_country.dtypes)
df_country.head(2)

code     int16
desc    object
dtype: object


Unnamed: 0,code,desc
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN


#### Loading mode Catalog

In [14]:
df_mode = get_catalog(f_content, "i94mode")

df_mode.loc[len(df_mode.index)] = ['-1', 'No info']
df_mode['code'] = df_mode['code'].astype('int16')

print(df_mode.dtypes)
df_mode.head(2)

code     int16
desc    object
dtype: object


Unnamed: 0,code,desc
0,1,Air
1,2,Sea


#### Loading Port Catalog

In [15]:
df_port = get_catalog(f_content, "$i94prtl")

df_port.loc[len(df_port.index)] = ['-1', 'No info']

print(df_port.dtypes)
df_port.head(2)

code    object
desc    object
dtype: object


Unnamed: 0,code,desc
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"


#### Loading State Catalog

In [16]:
df_addr = get_catalog(f_content, "i94addrl")

print(df_addr.dtypes)
df_addr.head(2)

code    object
desc    object
dtype: object


Unnamed: 0,code,desc
0,AL,ALABAMA
1,AK,ALASKA


### Loading "Main" table (student visa only!)

In [17]:
# Reading configuration file
config = configparser.ConfigParser()
config.read_file(open('aws/credentials.cfg'))
output_data = config['AWS']['S3_BUCKET']

# Building a spark session with a connection to AWS S3
spark = SparkSession.builder \
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2") \
        .config("spark.hadoop.fs.s3a.access.key",config['AWS']['AWS_ACCESS_KEY_ID']) \
        .config("spark.hadoop.fs.s3a.secret.key",config['AWS']['AWS_SECRET_ACCESS_KEY']) \
        .getOrCreate()

In [18]:
# months and fields to load
month_list = ["jan","feb","mar","apr","may","jun","jul","aug","sep","oct","nov","dec"]
field_list = ['cicid', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode', 'i94addr', 
              'depdate', 'i94bir', 'entdepa', 'entdepd', 'entdepu', 'matflag', 
              'biryear', 'dtaddto', 'gender', 'airline', 'fltno', 'visatype']

for month in month_list:
    fname = f'../../data/18-83510-I94-Data-2016/i94_{month}16_sub.sas7bdat'
    # student visa only!
    df_t = spark.read.format('com.github.saurfang.sas.spark').load(fname)
    df_t = df_t.filter(df_t.i94visa == 3.0).select(field_list)
   
    if month == 'jan':
        df_final = df_t
    else:
        df_final = df_final.union(df_t)

In [19]:
%%time
df_final = df_final.cache()
print(f"Number of rows loaded: {df_final.count()}")

Number of rows loaded: 1573271
CPU times: user 60.7 ms, sys: 7.41 ms, total: 68.1 ms
Wall time: 7min 7s


In [20]:
df_final.show(2)

+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
|cicid|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|airline|fltno|visatype|
+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
|  7.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|      T|   null|   null|   null| 1996.0|    D/S|     M|     LH|  424|      F1|
|  8.0| 101.0| 101.0|    BOS|20465.0|    1.0|     MA|   null|  20.0|      T|   null|   null|   null| 1996.0|    D/S|     M|     LH|  424|      F1|
+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
only showing top 2 rows



### Loading extra data sets

#### Loading *us-cities-demographics.csv* file

In [21]:
fname = './data/us-cities-demographics.csv'
df_demo = pd.read_csv(fname, sep=';')

print(df_demo.dtypes)
df_demo.head(2)

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


#### Loading *visaType.csv* file

In [22]:
fname = './data/visaType.csv'
df_visa = pd.read_csv(fname, sep=';')

print(df_visa.dtypes)
df_visa.head(2)

visa_code    object
visa_desc    object
dtype: object


Unnamed: 0,visa_code,visa_desc
0,F1,Visa Holder: Non-immigrant student and exchang...
1,F2,Visa Holder: Spouse or Child of Academic Student


#### Loading *iataCodes.xlsx* file

In [23]:
fname = './data/iataCodes.xlsx'
df_airline = pd.read_excel(fname, converters={'iata_code':str,'iata_airlines':str})

print(df_airline.dtypes)
df_airline.head(2)

iata_code        object
iata_airlines    object
dtype: object


Unnamed: 0,iata_code,iata_airlines
0,AA,American Airlines
1,2G,CargoItalia (alternate)


# -------------------------------------------------------------------------------------------------------------------

### <span style="color:blue">Step 2: Exploring, Assessing and saving the Data</span>
#### 2.1 Exploring the Data 
Data is explored and cleaned using data wrangling functions and methods like `printSchema()`, `isNull()`, `isnan()`, `groupby()`, `filter()`, `count()`, etc.

#### 2.2 Cleaning Steps

For I94 Immigration Data:
- Checking duplicates: duplicated records were deleted (cicid must be unique).
- Checking missing values: missing vales were replaced by default values (-1:no data, 99: all other values, 0, or '').
- Casting: several fields were casted from Double to Integer
- Checking non existing codes: codes that don't exist in the Catalogs were replaced by default values.

For us-cities-demographics:
- Was pivoted to have ethnicity data in columns.
- Was aggregated by State.

#### 2.3 Saving data to S3 in parquet files
<img src="./s3.JPG" width="300" height="300" align="left"/>

### Exploring and Cleaning the "Main table" (I94 Immigration Data)

In [24]:
#Number of rows loaded: 1,573,271
print(df_final.count())
df_final.printSchema()

1573271
root
 |-- cicid: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



#### Checking duplicates

In [25]:
# number of cicid duplicated
print(f"cicid duplicated: {df_final.groupby('cicid').count().filter(col('count')>1).count()}")

# total rows to drop
rows_dup = df_final.groupby('cicid').count().filter(col('count')>1).agg({'count':'sum'}).collect()[0]['sum(count)']
print(f"Total rows Duplicated: {rows_dup}")

cicid duplicated: 125296
Total rows Duplicated: 256947


In [26]:
# number of rows and after dropping
print(f"Number of rows after dropping duplicated rows: {1573271 - rows_dup}")
print(f"Percentage of duplicated rows dropped: {round(rows_dup/1573271 * 100, 2)}%")

Number of rows after dropping duplicated rows: 1316324
Percentage of duplicated rows dropped: 16.33%


In [27]:
# Identifying and saving the duplicated rows
df_final_dup = df_final.groupby('cicid').count().filter(col('count')>1)
df_final_dup.show(2)

+-------+-----+
|  cicid|count|
+-------+-----+
|17267.0|    2|
|21911.0|    2|
+-------+-----+
only showing top 2 rows



In [28]:
# dropping duplicates
df_final2 = df_final.join(df_final_dup, on=['cicid'], how='left_anti')
df_final2 = df_final2.cache()

#rows after dropping duplicates
df_final2.count()

1316324

#### Checking missing values

In [29]:
df_final2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_final.columns]).show()

+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
|cicid|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|airline|fltno|visatype|
+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
|    0|  1399|     0|      0|      0|   1001|  41128| 866828|    28|     23| 867670|1310466| 866746|     28|   1424|  1120|  43504| 2331|       0|
+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+



In [30]:
# Replacing missing values
df_final2 = df_final2.fillna({'i94cit':-1, 'i94mode':-1, 'i94addr':'99', 'depdate':0, 'i94bir':0, 'entdepa':'', 'entdepd':'',
                              'entdepu':'', 'matflag':'', 'biryear':0, 'dtaddto':'', 'gender':'', 'airline': '-1', 'fltno': '-1'})

In [31]:
df_final2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_final.columns]).show()

+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
|cicid|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|airline|fltno|visatype|
+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
|    0|     0|     0|      0|      0|      0|      0|      0|     0|      0|      0|      0|      0|      0|      0|     0|      0|    0|       0|
+-----+------+------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+



#### Casting

In [32]:
# casting these columns from double to integer
cast_lst = ['cicid','i94cit','i94res','arrdate','i94mode','depdate','i94bir','biryear']

for col_name in cast_lst:
    df_final2 = df_final2.withColumn(col_name, col(col_name).cast('int'))
                         
# convert arrdate and depdate from double to date   
df_final2 = df_final2.withColumn('arrdate',expr("date_add('1960-01-01',arrdate)")) \
                     .withColumn('depdate',expr("date_add('1960-01-01',depdate)"))
    
df_final2.show(2)

+-----+------+------+-------+----------+-------+-------+----------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
|cicid|i94cit|i94res|i94port|   arrdate|i94mode|i94addr|   depdate|i94bir|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|airline|fltno|visatype|
+-----+------+------+-------+----------+-------+-------+----------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
|  299|   103|   103|    BOS|2016-01-30|      1|     RI|1960-01-01|    25|      T|       |       |       |   1991|    D/S|     M|     LX|   52|      F1|
|  305|   103|   103|    FTL|2016-01-04|      1|     FL|1960-01-01|    24|      T|       |       |       |   1992|    D/S|     M|     BW|  480|      F1|
+-----+------+------+-------+----------+-------+-------+----------+------+-------+-------+-------+-------+-------+-------+------+-------+-----+--------+
only showing top 2 rows



### Exploring and Cleaning the "Catalogs" codes

#### "Country" Catalog

In [33]:
pd_df_cntry = df_final2.select('i94cit').dropDuplicates().withColumnRenamed("i94cit","code").toPandas()
pd_df_cntry2 = df_final2.select('i94res').dropDuplicates().withColumnRenamed("i94res","code").toPandas()

pd_df_cntryf = pd.concat([pd_df_cntry,pd_df_cntry2], join='outer').drop_duplicates()

In [34]:
# number of codes that don't exist in the country catalog
df_tmp = set(pd_df_cntryf.code) - set(df_country.code)
print(f" Number of codes that don't exist in the country catalog: {len(df_tmp)}")

 Number of codes that don't exist in the country catalog: 29


In [35]:
# setting to 99 the codes that don't exist in the country catalog
df_final2 = df_final2.withColumn('i94cit', when(df_final2.i94cit.isin(df_tmp), 99).otherwise(df_final2.i94cit)) \
                     .withColumn('i94res', when(df_final2.i94res.isin(df_tmp), 99).otherwise(df_final2.i94res))

#### "Mode" Catalog

In [36]:
pd_df_mode = df_final2.select('i94mode').dropDuplicates().toPandas()

In [37]:
# number of codes that don't exist in the mode catalog
df_tmp = set(pd_df_mode.i94mode) - set(df_mode.code)
print(f" Number of codes that don't exist in the mode catalog: {len(df_tmp)}")

 Number of codes that don't exist in the mode catalog: 1


In [38]:
# setting to -1 the codes that don't exist in the mode catalog
df_final2 = df_final2.withColumn('i94mode', when(df_final2.i94mode == 0,-1).otherwise(df_final2.i94mode))

#### "Port" Catalog

In [39]:
pd_df_port = df_final2.select('i94port').dropDuplicates().toPandas()

In [40]:
# number of codes that don't exist in the port catalog
df_tmp = set(pd_df_port.i94port) - set(df_port.code)
print(f" Number of codes that don't exist in the port catalog: {len(df_tmp)}")

 Number of codes that don't exist in the port catalog: 0


#### "State" Catalog

In [41]:
pd_df_state = df_final2.select('i94addr').dropDuplicates().toPandas()

In [42]:
# number of codes that don't exist in the state catalog
df_tmp = set(pd_df_state.i94addr) - set(df_addr.code)
print(f" Number of codes that don't exist in the state catalog: {len(df_tmp)}")

 Number of codes that don't exist in the state catalog: 27


In [43]:
# setting to 99 the codes that don't exist in the state catalog
df_final2 = df_final2.withColumn('i94addr', when(df_final2.i94addr.isin(df_tmp), '99').otherwise(df_final2.i94addr))

#### "Visa" Catalog

In [44]:
pd_df_visa = df_final2.select('visatype').dropDuplicates().toPandas()

In [45]:
# number of codes that don't exist in the visa catalog
df_tmp = set(pd_df_visa.visatype) - set(df_visa.visa_code)
print(f" Number of codes that don't exist in the visa catalog: {len(df_tmp)}")

 Number of codes that don't exist in the visa catalog: 0


#### "Airline" Catalog

In [46]:
pd_df_airline = df_final2.select('airline').dropDuplicates().toPandas()

In [47]:
# number of codes that don't exist in the airline catalog
df_tmp = set(pd_df_airline.airline) - set(df_airline.iata_code)
print(f" Number of codes that don't exist in the airline catalog: {len(df_tmp)}")

 Number of codes that don't exist in the airline catalog: 229


In [48]:
# setting to 99 the codes that don't exist in the airline catalog
df_final2 = df_final2.withColumn('airline', when(df_final2.airline.isin(df_tmp), '99').otherwise(df_final2.airline))

### Exploring and cleaning: *us-cities-demographics*

In [49]:
df_demo.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


In [50]:
df_demot = df_demo.drop(columns = ['Race','Count']).drop_duplicates()
len(df_demot)

596

In [51]:
df_demo_piv = df_demo[['State Code','City','Race','Count']]
#pivot(index=['City','State'], columns='Race', values='Count')

df_demo_piv = (df_demo.set_index(["State Code", "City"])
                      .pivot(columns="Race")['Count']
                      .reset_index()
                      .rename_axis(None, axis=1)
               )

In [52]:
df_demof = df_demot.merge(df_demo_piv, how='inner', on=['State Code','City'])
print(len(df_demof))

596


In [53]:
df_demof.head(2)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,1084.0,8841.0,21330.0,25924.0,37756.0
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,351.0,30473.0,3917.0,2566.0,58723.0


In [54]:
df_demof = df_demof.drop(columns = ['City','State'])

#changing column names
df_demof = df_demof.groupby('State Code').agg({'Median Age':'median', 
                                                'Male Population':'sum', 
                                                'Female Population':'sum', 
                                                'Total Population':'sum', 
                                                'Number of Veterans':'sum',
                                                'Foreign-born':'sum',
                                                'Average Household Size':'mean',
                                                'American Indian and Alaska Native':'sum',
                                                'Black or African-American':'sum',
                                                'Hispanic or Latino':'sum',
                                                'White':'sum',
                                                'Asian':'sum'                                  
                                               })
# formatting column names
df_demof.reset_index(level=0, inplace=True)
df_demof.rename(columns=lambda x: x.replace(" ", "_"), inplace=True)
df_demof.rename(columns=lambda x: x.replace("-", "_"), inplace=True)
df_demof.rename(columns=str.lower, inplace=True)

In [55]:
print(len(df_demof))
df_demof.head(2)

49


Unnamed: 0,state_code,median_age,male_population,female_population,total_population,number_of_veterans,foreign_born,average_household_size,american_indian_and_alaska_native,black_or_african_american,hispanic_or_latino,white,asian
0,AK,32.2,152945.0,145750.0,298695,27492.0,33258.0,2.77,36339.0,23107.0,27261.0,212696.0,36825.0
1,AL,38.0,497248.0,552381.0,1049629,71543.0,52154.0,2.434286,8084.0,521068.0,39313.0,498920.0,28769.0


### Saving all data sets in S3 (as parquet files)

#### Loading the main table to s3

In [73]:
%%time
#write to parquet
print(f"#### Loading data to {output_data}I94-Data... ####")

# reducing from 200 partitions to 2, to reduce the loading time to s3
df_final2 = df_final2.coalesce(2)
df_final2.write.parquet(f"{output_data}I94-Data")

#### Loading data to s3a://jorge-udacity-bucket2/I94-Data... ####
CPU times: user 8.19 ms, sys: 0 ns, total: 8.19 ms
Wall time: 33.9 s


In [56]:
df_final2.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: date (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = false)
 |-- depdate: date (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- entdepa: string (nullable = false)
 |-- entdepd: string (nullable = false)
 |-- entdepu: string (nullable = false)
 |-- matflag: string (nullable = false)
 |-- biryear: integer (nullable = true)
 |-- dtaddto: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- airline: string (nullable = false)
 |-- fltno: string (nullable = false)
 |-- visatype: string (nullable = true)



#### Converting data frames from pandas to spark

In [57]:
sc = spark.sparkContext
sqlCtx = SQLContext(sc)

schema = StructType([ \
    StructField("code", IntegerType(), True), \
    StructField("desc",StringType(),True) \
  ])

# converting pandas df to spark df
dfs_demof = sqlCtx.createDataFrame(df_demof)
dfs_airline = sqlCtx.createDataFrame(df_airline)
dfs_visa = sqlCtx.createDataFrame(df_visa)
dfs_addr = sqlCtx.createDataFrame(df_addr)
dfs_port = sqlCtx.createDataFrame(df_port)
dfs_mode = sqlCtx.createDataFrame(df_mode, schema=schema)
dfs_country = sqlCtx.createDataFrame(df_country, schema=schema)

In [58]:
dfs_demof.printSchema()

root
 |-- state_code: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: double (nullable = true)
 |-- female_population: double (nullable = true)
 |-- total_population: long (nullable = true)
 |-- number_of_veterans: double (nullable = true)
 |-- foreign_born: double (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- american_indian_and_alaska_native: double (nullable = true)
 |-- black_or_african_american: double (nullable = true)
 |-- hispanic_or_latino: double (nullable = true)
 |-- white: double (nullable = true)
 |-- asian: double (nullable = true)



In [59]:
dfs_airline.printSchema()

root
 |-- iata_code: string (nullable = true)
 |-- iata_airlines: string (nullable = true)



In [60]:
dfs_visa.printSchema()

root
 |-- visa_code: string (nullable = true)
 |-- visa_desc: string (nullable = true)



In [61]:
dfs_addr.printSchema()

root
 |-- code: string (nullable = true)
 |-- desc: string (nullable = true)



In [62]:
dfs_port.printSchema()

root
 |-- code: string (nullable = true)
 |-- desc: string (nullable = true)



In [63]:
dfs_mode.printSchema()

root
 |-- code: integer (nullable = true)
 |-- desc: string (nullable = true)



In [64]:
dfs_country.printSchema()

root
 |-- code: integer (nullable = true)
 |-- desc: string (nullable = true)



#### Saving metadata to a local file

In [65]:
meta_data = {}

meta_data['table'] = ["dim_country", "dim_entry_mode", "dim_visa_type", "dim_entry_port",
                      "dim_airline", "dim_date", "dim_destination_state", "student_arrivals_fact"]

meta_data['desc_column'] = ["description", "description", "description", "description", 
                            "description", "", "state_name", ""]

arrdate_count = df_final2.select('arrdate').dropDuplicates().count()

meta_data['expected_rows'] = [len(df_country), len(df_mode), len(df_visa), len(df_port),
                              len(df_airline), arrdate_count, len(df_addr), df_final2.count()]

df_meta_data = pd.DataFrame.from_dict(meta_data)
df_meta_data

Unnamed: 0,table,desc_column,expected_rows
0,dim_country,description,290
1,dim_entry_mode,description,5
2,dim_visa_type,description,20
3,dim_entry_port,description,661
4,dim_airline,description,197
5,dim_date,,366
6,dim_destination_state,state_name,55
7,student_arrivals_fact,,1316324


In [66]:
print(f"#### saving metadata file ####")
df_meta_data.to_json("./metadata.json")

#### saving metadata file ####


#### Loading Catalogs and extra data sets to s3

In [85]:
%%time
print(f"#### Loading Catalogs and extra data sets to S3... ####")

dfs_demof.write.parquet(f"{output_data}demographics")
dfs_airline.write.parquet(f"{output_data}airlines")
dfs_visa.write.parquet(f"{output_data}visas")
dfs_addr.write.parquet(f"{output_data}states")
dfs_port.write.parquet(f"{output_data}port")
dfs_mode.write.parquet(f"{output_data}mode")
dfs_country.write.parquet(f"{output_data}countries")

#### Loading Catalogs and extra data sets to S3... ####
CPU times: user 13.8 ms, sys: 189 µs, total: 13.9 ms
Wall time: 1min 16s


# -------------------------------------------------------------------------------------------------------------------

### <span style="color:blue">Step 3: Defining the Data Model</span>
#### 3.1 Conceptual Data Model
I chose a dimensional model in Redshift because I'm working with structured and relatively small amount of data(1.3 Millon rows). If data grows, It will not be a problem since a Redshift cluster is scalable, distributed, powerful and cost-effective. The datasets structures adapt naturally to a dimensional model.

The Data model has one fact table and 7 dimensional tables.

1- Fact table - **student_arrivals_fact**: is filled with the *i94 immigration data*

2- Dimensional tables
- **dim_visa_type**: is filled with the *visaType.csv* dataset.
- **dim_entry_port**: is filled with the data extracted from *I94_SAS_Labels_Descriptions.SAS*
- **dim_airline**: is filled with the *iataCodes.xlsx* dataset
- **dim_date**: is filled with the arrival_date field of *student_arrivals_fact* table.
- **dim_country**: is filled with the data extracted from *I94_SAS_Labels_Descriptions.SAS*
- **dim_entry_mode**: is filled with the data extracted from *I94_SAS_Labels_Descriptions.SAS*
- **dim_destination_state**: is filled with the data extracted from *I94_SAS_Labels_Descriptions.SAS* and *us-cities-demographics.csv*

#### 3.2 Mapping Out Data Pipelines
Steps necessary to pipeline the data into the data model:

* Create the staging, dimensional and fact tables.(if model doesn't exist)
* Truncate dimensional tables.(except dim_date)
* Copy data from S3 to staging tables and to some dimensional tables.
* Insert data from the staging tables to the rest dimensional tables.
* Insert data from the staging tables to the fact table.

**Data Model**
![](./Capstone_ER.jpeg)
**Data Flow**
![](./Data_Flow.jpeg)

# -------------------------------------------------------------------------------------------------------------------

### <span style="color:blue">Step 4: Running the ETL</span>
#### 4.1 Creating the data model

* **Creating a Redshift Cluster(Optional):**

In [1]:
# Creates a redshift cluster, use the create argument
!python ./src/create_redshift_cluster.py create

#### Cluster Created!!! ####
ec2.SecurityGroup(id='sg-072a9f5ed97106021')
An error occurred (InvalidPermission.Duplicate) when calling the AuthorizeSecurityGroupIngress operation: the specified rule "peer: 0.0.0.0/0, TCP, from port: 5439, to port: 5439, ALLOW" already exists
                 Key         Value
0  ClusterIdentifier    dwhcluster
1           NodeType     dc2.large
2      ClusterStatus      creating
3     MasterUsername       dwhuser
4             DBName           dwh
5              VpcId  vpc-646c301c
6      NumberOfNodes             3


* **Wait until the cluster is available**

* **Creating tables: Staging, Facts and Dimensions (First time only)**

In [2]:
# Drops all tables if exist and creates them
!python ./src/create_tables.py

### Tables Dropped Correctly! ###
### Tables Created Correctly! ###


* **Extracting from S3, Transforming and Loading to Fact and Dimension tables in Redshift**

In [3]:
!python ./src/etl.py

### Executing query: TRUNCATE TABLE dim_country
### Executing query: TRUNCATE TABLE dim_entry_mode
### Executing query: TRUNCATE TABLE dim_visa_type
### Executing query: TRUNCATE TABLE dim_entry_port
### Executing query: TRUNCATE TABLE dim_airline
### Executing query: TRUNCATE TABLE dim_destination_state
### Executing query: TRUNCATE TABLE staging_i94
### Executing query: TRUNCATE TABLE staging_demographics
### Executing query: TRUNCATE TABLE staging_states
### Tables truncated Correctly! ###
### Executing query: COPY staging_i94 
                       FROM 's3://jorge-udacity-bucket2/I94-Data/'
                       IAM_ROLE 'arn:aws:iam::923772700788:role/myRedshiftRole'
                       FORMAT AS PARQUET;
                    
### Executing query: COPY staging_demographics
                        FROM 's3://jorge-udacity-bucket2/demographics/'
                        IAM_ROLE 'arn:aws:iam::923772700788:role/myRedshiftRole'
                        FORMAT AS PARQUET;
          

#### 4.2 Data Quality Checks
Two quality checks are done:

1- Counting records: The *medata.json* file is read, it contains the expected rows per table. The test compares the expected rows with the number of rows of each table.

2- Identifying records with null or empty values in description column for dimensional tables.
 
* **Running Quality Checks**

In [75]:
#run quality checks
!python ./src/data_quality.py

#### Data quality checking: counting records ####
Passed: dim_country loaded with 290 rows, expected rows:290
Passed: dim_entry_mode loaded with 5 rows, expected rows:5
Error: Data quality check failed. dim_visa_type contained 0 rows
Passed: dim_entry_port loaded with 661 rows, expected rows:661
Passed: dim_airline loaded with 197 rows, expected rows:197
Error: relation "dim_date" does not exist

Passed: dim_destination_state loaded with 55 rows, expected rows:55
Passed: student_arrivals_fact loaded with 1316324 rows, expected rows:1316324

#### Data quality checking: identifying records with null or empty values in description column ####
Passed: Data quality on table dim_country check passed with 0 NULL records
Passed: Data quality on table dim_entry_mode check passed with 0 NULL records
Passed: Data quality on table dim_visa_type check passed with 0 NULL records
Passed: Data quality on table dim_entry_port check passed with 0 NULL records
Error: Check failed. dim_airline contained 2

* **Deleting a Redshift Cluster(Optional):**

In [76]:
# Creates a redshift cluster, use the delete argument
!python ./src/create_redshift_cluster.py delete

#### Cluster Deleted!!! ####


#### 4.3 Data dictionary 
see the file: **data dictionary.xlsx**

# -------------------------------------------------------------------------------------------------------------------

### <span style="color:blue">Step 5: Project Write Up</span>
#### 5.1 Clearly state the rationale for the choice of tools and technologies for the project.

I preprocessed the data "locally" in the workspace since it was possible to do it, the amount of data processed is relatively small, it was not required to do it in the cloud. I processed the i94 immigration data using Spark because it is the largest dataset, it took half the time that in Pandas since Spark works with parallelism and partitions. I processed the rest of the data with Pandas. finally I saved all data in S3 to be easily consumed by Redshift.

I chose Redshift since it is scalable, distributed, conventional(it uses SQL), powerful and cost-effective, and because the datasets structures adapt naturally to a dimensional model. Redshift can be easily maintained, since no advanced or specialized knowledge is required to do so.

#### 5.2 Propose how often the data should be updated and why.

For the stated purpose of the model, I beleive the data could be updated weekly given its analytical and non-operational nature.

#### 5.3 Write a description of how you would approach the problem differently under the following scenarios:
 * **The data was increased by 100x.**: I would have preprocessed the data in the cloud since it would be difficult to do it on-premises, a powerful hardware would be required. I've always would have used S3 and Redshift, they can easily adapt to data grow.
 * **The data populates a dashboard that must be updated on a daily basis by 7am every day.**: 
  I would have used a tool like Apache Airflow to schedule and run the data pipelines. 
 * **The database needed to be accessed by 100+ people.**: In Redshift Cluster, Node type and the number of nodes can be adjusted as needed to handle the demand.