# Capstone Project
## Udacity Data Engineering Nanodegree
### Project description and goals
The goal of this project is to build a data warehouse as a source-of-truth database, allowing a group of climate data scientists to have easier access to the data they need for their climate model and for further insights exploration. They're looking to better undrestand the relationship between world travel, world temperature, and world emissions in the United States.

#### Project steps:
1. Scope the project and gather data
2. Explore, assess, and clean the data
3. Define data model
4. Run ETL to model the data
5. Project write up

In [1]:
# Imports and installs
import pandas as pd
import numpy as np

## 1. Project Scope and Gathering Data
#### Scope of project
This project aims to build an ELT pipeline that extracts raw data from S3, stages the data in Redshift, and then transforms the data into a set of fact and dimension tables accessible to climate data scientists. The two main tools used will be AWS S3 for data storage purposes and AWS Redshift as a warehouse. Data exploration and the development of data processing steps before running the ETL will be run on Python Pandas within a notebook. SQL in Python will be used to create and load tables.

#### Data used
Data used are:
1. I94 Immigration Data (SAS): data from the United States National Tourism and Trade Office. It contains information on arrivals in the United States, such as date of arrival, passenger residence country, visa type, and mode of transport.

2. I94 Immigration Data Labels (SAS): data from the United States National Tourism and Trade Office. It is a data dictionary for the I94 Immigration Data to help decode some abbreviations used within the Immigration Data.

3. Monthly Arrivals Data (CSV): data from the United States National Tourism and Trade Office. It contains information on how many arrivals the United States receives per month from countries of the world, from the year 2000 until 2023.

4. World Temperature Data (CSV): data from Kaggle. It contains information on average global land temperature for each city and country in the world starting from the year 1750 until 2013.

5. World Emissions Data (CSV): data from Kaggle. It contains information on average CO2 emissions by country each year from the year 1960 until 2016.

### I94 Immigration Data

In [2]:
# Reading a sample file in csv
immigration_data = pd.read_csv('raw_data/immigration_data.csv')

In [3]:
immigration_data.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [4]:
immigration_data.dtypes

Unnamed: 0      int64
cicid         float64
i94yr         float64
i94mon        float64
i94cit        float64
i94res        float64
i94port        object
arrdate       float64
i94mode       float64
i94addr        object
depdate       float64
i94bir        float64
i94visa       float64
count         float64
dtadfile        int64
visapost       object
occup          object
entdepa        object
entdepd        object
entdepu       float64
matflag        object
biryear       float64
dtaddto        object
gender         object
insnum        float64
airline        object
admnum        float64
fltno          object
visatype       object
dtype: object

### Arrivals Data

In [5]:
arrivals_data = pd.read_csv('raw_data/arrivals_data.csv')

In [6]:
arrivals_data.head()

Unnamed: 0,1,International Visitors--  1) Country of Residence  2) 1+ nights in the USA  3) Among qualified visa types,World Region,2000-01,2000-02,2000-03,2000-04,2000-05,2000-06,2000-07,...,Unnamed: 314,Unnamed: 315,Unnamed: 316,Unnamed: 317,Unnamed: 318,Unnamed: 319,Unnamed: 320,Unnamed: 321,Unnamed: 322,Unnamed: 323
0,2.0,,,,,,,,,,...,,,,,,,,,,
1,3.0,TOTAL ALL COUNTRIES,,2866229.0,2948121.0,3709993.0,4108278.0,3712454.0,3599435.0,4947731.0,...,,,,,,,,,,
2,4.0,,,,,,,,,,...,,,,,,,,,,
3,5.0,OVERSEAS,,1694560.0,1793649.0,2056928.0,2281387.0,2178919.0,2226706.0,2642601.0,...,,,,,,,,,,
4,6.0,,,,,,,,,,...,,,,,,,,,,


In [7]:
arrivals_data.dtypes

1                                                                                                                      float64
International Visitors--\n   1) Country of Residence\n   2) 1+ nights in the USA\n   3)  Among qualified visa types     object
World \nRegion                                                                                                          object
2000-01                                                                                                                 object
2000-02                                                                                                                 object
2000-03                                                                                                                 object
2000-04                                                                                                                 object
2000-05                                                                                                        

### World Temperature Data

In [8]:
temperature_data = pd.read_csv('raw_data/temperature_data.csv')

In [9]:
temperature_data.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [10]:
temperature_data.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

### World Emissions Data

In [11]:
emissions_data = pd.read_csv('raw_data/co2_emissions_kt_by_country.csv')

In [12]:
emissions_data.head()

Unnamed: 0,country_code,country_name,year,value
0,ABW,Aruba,1960,11092.675
1,ABW,Aruba,1961,11576.719
2,ABW,Aruba,1962,12713.489
3,ABW,Aruba,1963,12178.107
4,ABW,Aruba,1964,11840.743


In [13]:
emissions_data.dtypes

country_code     object
country_name     object
year              int64
value           float64
dtype: object

## 2. Explore, Assess, Clean Data
#### Data issues
    1. I94 Immigration Data
        a. Not all column names are necessarily clear 
        b. Contains unnecessary columns that are empty or are not required in the final tables
        c. Some columns do not have the right data types
        d. Some columns (i.e., 'arrival_date' and 'departure_date' are in SAS date formats
    
    2. I94 Immigration Data Labels
        a. Has mulitple data in one file as a data dictionary and must be split
        
    3. Arrivals Data
        a. Contains unnecessary rows that are empty or are not required in the final tables
        b. Contains unnecessary columns that are empty or are not required in the final tables
        c. Not all column names are necessarily clear
        d. Columns and rows are difficult to process and therefore must be pivoted to be more consistent
        e. Some columns do not have the right data types
        f. A date column (i.e., 'arrival_date') did not have the correct date format
    
    4. World Temperature Data
        a. Contains temperature data for all cities in the world - only US data is needed
        b. Not all column names are necessarily clear
        
    5. World Emissions Data
        a. Contains emissions data for all countries in the world - only US data is needed
        b. Not all column names are necessarily clear
        

#### Cleaning steps
1. I94 Immigration Data: all columns will be renamed and those that are not required in the final tables will be dropped. SAS date formats are changed to pandas date format and other columns' data types will be addressed accordingly.
    
2. I94 Immigration Data Labels: this data dictionary will be split into 3 different files - code and country, code and port, and code and state. Code and country contains information on the country equivalent for each code that appears in I94 Immigration Data's 'i94cit' and 'i94res' columns. Code and port contains port equivalent for each code that appears in I94 Immigration Data's 'i94port' column. Code and state contains state equivalent for each code that appears in I94 Immigration Data's 'i94addr' column.
    
3. Arrivals Data: rows and columns that are not required in the final tables will be dropped. Arrivals data's table format is difficult to digest (i.e., columns were made up of 'countries', 'world_region', and all the dates from 2000 until 2023. This data will be pivoted so that the data's columns will only consist of 'countries', 'world_region', 'arrival_date' (with all the former date columns as rows), and 'arrival_total' (with all the total arrivals for each arrival date and for each country). All columns are also renamed to make data more consistent, and all data types will be addressed accordingly.
    
4. World Temperature Data: data will be sectioned to only contain temperature data on United States and its cities. All columns are also renamed to make data more consistent.
    
5. World Emissions Data: data will be sectioned to only contain emissions data on United States and its cities. All columns are also renamed to make data more consistent.

### I94 Immigration Data

In [14]:
immi_data_copy = immigration_data.copy()

In [15]:
# Renaming columns
new_col_names = ['Unnamed: 0', 'cic_id', 'year', 'month', 'citizen_country_code', 'residence_country_code', 
                 'port_arrival_code', 'arrival_date', 'travel_mode', 'state_address_code', 'departure_date', 
                 'passenger_age', 'visa', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 
                 'matflag', 'passenger_birth_year', 'dtaddto', 'gender', 'ins_num', 'airline_carrier', 'admission_num', 
                 'flight_num', 'visa_type']
immi_data_copy.columns = new_col_names

In [16]:
# Dropping columns
cols_to_drop = ['Unnamed: 0', 'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'dtaddto']
immi_data_copy = immi_data_copy.drop(columns=cols_to_drop)

In [17]:
# Ensuring all data types are aligned
immi_data_copy.replace([np.inf, -np.inf, 'inf', '-inf', 'NaN'], np.nan, inplace=True)
immi_data_copy.fillna(0, inplace=True)

cols_to_int = ['cic_id', 'year', 'month', 'citizen_country_code', 'residence_country_code', 'travel_mode', 'passenger_age', 'visa', 'passenger_birth_year', 'ins_num', 'admission_num']
immi_data_copy[cols_to_int] = immi_data_copy[cols_to_int].astype(int)

cols_to_str = ['port_arrival_code', 'state_address_code', 'gender', 'airline_carrier', 'flight_num', 'visa_type']
immi_data_copy[cols_to_str] = immi_data_copy[cols_to_str].astype(str)

In [18]:
# Dropping duplicates, if any
immigration_clean = immi_data_copy.drop_duplicates()

In [19]:
# Changing SAS data format    
immigration_clean['arrival_date'] = pd.to_datetime('1960-01-01') + pd.to_timedelta(immigration_clean['arrival_date'], unit='D')
immigration_clean['departure_date'] = pd.to_datetime('1960-01-01') + pd.to_timedelta(immigration_clean['departure_date'], unit='D')

In [20]:
# Final Checks
immigration_clean.head()

Unnamed: 0,cic_id,year,month,citizen_country_code,residence_country_code,port_arrival_code,arrival_date,travel_mode,state_address_code,departure_date,passenger_age,visa,passenger_birth_year,gender,ins_num,airline_carrier,admission_num,flight_num,visa_type
0,4084316,2016,4,209,209,HHW,2016-04-22,1,HI,2016-04-29,61,2,1955,F,0,JL,56582674633,00782,WT
1,4422636,2016,4,582,582,MCA,2016-04-23,1,TX,2016-04-24,26,2,1990,M,0,*GA,94361995930,XBLNG,B2
2,1195600,2016,4,148,112,OGG,2016-04-07,1,FL,2016-04-27,76,2,1940,M,0,LH,55780468433,00464,WT
3,5291768,2016,4,297,297,LOS,2016-04-28,1,CA,2016-05-07,25,2,1991,M,0,QR,94789696030,00739,B2
4,985523,2016,4,111,111,CHM,2016-04-06,3,NY,2016-04-09,19,2,1997,F,0,0,42322572633,LAND,WT


In [21]:
immigration_clean.columns

Index(['cic_id', 'year', 'month', 'citizen_country_code',
       'residence_country_code', 'port_arrival_code', 'arrival_date',
       'travel_mode', 'state_address_code', 'departure_date', 'passenger_age',
       'visa', 'passenger_birth_year', 'gender', 'ins_num', 'airline_carrier',
       'admission_num', 'flight_num', 'visa_type'],
      dtype='object')

In [22]:
immigration_clean.dtypes

cic_id                             int64
year                               int64
month                              int64
citizen_country_code               int64
residence_country_code             int64
port_arrival_code                 object
arrival_date              datetime64[ns]
travel_mode                        int64
state_address_code                object
departure_date            datetime64[ns]
passenger_age                      int64
visa                               int64
passenger_birth_year               int64
gender                            object
ins_num                            int64
airline_carrier                   object
admission_num                      int64
flight_num                        object
visa_type                         object
dtype: object

### I94 Data Labels

In [23]:
labels_data_url = 'raw_data/I94_SAS_Labels_Descriptions.SAS'

with open(labels_data_url, 'r') as file:
    lines = file.readlines()

In [24]:
# Defining which lines belong to which coded column
country_code_lines = lines[9:298]
port_code_lines = lines[302:962]
state_code_lines = lines[981:1036]

In [25]:
# Functions to process lines into different files
def process_lines_abbreviation(line):
    parts = line.strip().split('=')
    code = parts[0].strip().replace("'", "")
    value = parts[1].strip().replace("'", "")
    return code, value

def process_lines_value(line):
    parts = line.strip().split('=')
    return parts[0].strip(), parts[1].strip().replace("'", "")

In [26]:
# Splitting into 3 different files    
df_countries = pd.DataFrame([process_lines_value(line) for line in country_code_lines], columns=['code', 'country'])
df_port = pd.DataFrame([process_lines_abbreviation(line) for line in port_code_lines], columns=['code', 'port'])
df_state = pd.DataFrame([process_lines_abbreviation(line) for line in state_code_lines], columns=['code', 'state'])

In [27]:
# Final checks
df_countries.head()

Unnamed: 0,code,country
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [28]:
df_countries.columns

Index(['code', 'country'], dtype='object')

In [29]:
df_port.head()

Unnamed: 0,code,port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [30]:
df_port.columns

Index(['code', 'port'], dtype='object')

In [31]:
df_state.head()

Unnamed: 0,code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [32]:
df_state.columns

Index(['code', 'state'], dtype='object')

### Arrivals Data

In [33]:
arrivals_data_copy = arrivals_data.copy()

In [34]:
# Dropping rows
arrivals_useful_rows = arrivals_data_copy.iloc[19:254]

In [35]:
# Dropping columns
arrivals_useful_cols = arrivals_useful_rows.drop(arrivals_useful_rows.columns[0], axis=1)

drop_these_cols = arrivals_useful_cols.columns.get_loc('2023-10')
arrivals_clean = arrivals_useful_cols.drop(arrivals_useful_cols.columns[drop_these_cols+1:], axis=1)

In [36]:
# Renaming columns
arrivals_final = arrivals_clean.rename(columns={'International Visitors--\n   1) Country of Residence\n   2) 1+ nights in the USA\n   3)  Among qualified visa types': 'country', 'World \nRegion': 'world_region'})

In [37]:
# Pivoting
arrivals_pivoted_df = arrivals_final.melt(id_vars=['country', 'world_region'], var_name='arrival_date', value_name='arrival_total')

In [38]:
# Ensuring date formats are correct
arrivals_pivoted_df['arrival_date'] = arrivals_pivoted_df.arrival_date + '-31'
arrivals_pivoted_df['arrival_date'] = arrivals_pivoted_df['arrival_date'].apply(lambda x: '-'.join([part if len(part) > 1 else '0' + part for part in x.split('-')]))

In [39]:
# Ensuring data types are aligned
arrivals_pivoted_df['arrival_total'] = arrivals_pivoted_df['arrival_total'].str.replace(',', '')
arrivals_pivoted_df['arrival_total'] = arrivals_pivoted_df['arrival_total'].replace(' -   ', 0)

arrivals_pivoted_final = arrivals_pivoted_df.fillna(0)

arrivals_pivoted_final['arrival_total'] = arrivals_pivoted_final['arrival_total'].astype(int)

In [40]:
# Final Checks
arrivals_pivoted_final.head()

Unnamed: 0,country,world_region,arrival_date,arrival_total
0,Afghanistan,Asia,2000-01-31,3
1,Albania,Eastern Europe,2000-01-31,200
2,Algeria,Africa,2000-01-31,234
3,Andorra,Western Europe,2000-01-31,110
4,Angola,Africa,2000-01-31,333


In [41]:
arrivals_pivoted_final.columns

Index(['country', 'world_region', 'arrival_date', 'arrival_total'], dtype='object')

In [42]:
arrivals_pivoted_final.dtypes

country          object
world_region     object
arrival_date     object
arrival_total     int64
dtype: object

### World Temperature Data

In [43]:
temp_data_copy = temperature_data.copy()

In [44]:
# Filtering to United States
temperature_clean = temp_data_copy[temp_data_copy['Country'] == 'United States']

In [45]:
# Renaming columns
renamed_cols = ['date', 'avg_temp', 'avg_temp_uncertainty', 'city', 'country', 'latitude', 'longitude']
temperature_clean.columns = renamed_cols

In [46]:
# Final checks
temperature_clean.head()

Unnamed: 0,date,avg_temp,avg_temp_uncertainty,city,country,latitude,longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [47]:
temperature_clean.columns

Index(['date', 'avg_temp', 'avg_temp_uncertainty', 'city', 'country',
       'latitude', 'longitude'],
      dtype='object')

In [48]:
temperature_clean.dtypes

date                     object
avg_temp                float64
avg_temp_uncertainty    float64
city                     object
country                  object
latitude                 object
longitude                object
dtype: object

### World Emissions Data

In [49]:
emi_data_copy = emissions_data.copy()

In [50]:
# Filtering to United States
emissions_clean = emi_data_copy[emi_data_copy['country_name'] == 'United States']

In [51]:
# Renaming columns
emissions_clean = emissions_clean.rename(columns={'value': 'emissions_kt'})

In [52]:
# Final checks
emissions_clean.head()

Unnamed: 0,country_code,country_name,year,emissions_kt
13209,USA,United States,1960,2890696.1
13210,USA,United States,1961,2880505.507
13211,USA,United States,1962,2987207.873
13212,USA,United States,1963,3119230.874
13213,USA,United States,1964,3255995.306


In [53]:
emissions_clean.columns

Index(['country_code', 'country_name', 'year', 'emissions_kt'], dtype='object')

In [54]:
emissions_clean.dtypes

country_code     object
country_name     object
year              int64
emissions_kt    float64
dtype: object

## 3. Define the Data Model
#### 3.1 Conceptual data model
Data model can be found in file 'data_model.pdf'

#### 3.2 Mapping out data pipelines
1. Data processing is run locally and clean files are uploaded into S3 bucket (s3://bucket-name-here/processed_data/)
2. Staging tables are dropped and then created in Redshift
3. Data from S3 is copied into staging tables
4. Immigration fact table is created
5. Passenger, arrivals, temperature, and emissions dimension tables are created
6. Final tables are uploaded back into S3 bucket (s3://bucket-name-here/final_tables/)
7. Data quality checks are run

## 4. Run Pipelines to Model the Data

#### 4.1 Create the data model
create_tables.py and etl.py are the two scripts to be run to create the data model. Below is the code to execute the two scripts. %%time is included to track the time taken for each script to run.

In [55]:
%%time
!python create_tables.py

CPU times: user 72.2 ms, sys: 18.1 ms, total: 90.3 ms
Wall time: 4.51 s


In [56]:
%%time
!python etl.py

CPU times: user 254 ms, sys: 47.5 ms, total: 302 ms
Wall time: 17 s


#### 4.2 Data quality checks
Data quality checks will ensure:
1. Fact and dimension tables are not empty after triggering ETL pipeline
2. Records in all fact and dimension tables are unique (there are no duplicates)

data_quality_checks.py is the script to run to perform data quality checks. Below is the code to execute the script. Logging is included to make sure data quality check outcomes are clear.

In [57]:
from data_quality_checks import data_quality_checks

In [58]:
data_quality_checks()

INFO:root:Performing data quality checks on fact_immigration...
INFO:root:Checking if fact_immigration is not an empty table...
INFO:root:Data quality check success, fact_immigration not empty.
INFO:root:Checking if fact_immigration has duplicates...
INFO:root:Data quality check success, fact_immigration has no duplicated data.
INFO:root:Performing data quality checks on dim_passenger...
INFO:root:Checking if dim_passenger is not an empty table...
INFO:root:Data quality check success, dim_passenger not empty.
INFO:root:Checking if dim_passenger has duplicates...
INFO:root:Data quality check success, dim_passenger has no duplicated data.
INFO:root:Performing data quality checks on dim_arrivals...
INFO:root:Checking if dim_arrivals is not an empty table...
INFO:root:Data quality check success, dim_arrivals not empty.
INFO:root:Checking if dim_arrivals has duplicates...
INFO:root:Data quality check success, dim_arrivals has no duplicated data.
INFO:root:Performing data quality checks on d

#### 4.3 Data dictionary
Data dictionary can be found in file 'data_dictionary.pdf'.

## 5. Project Write Up
### 5.1 Rationale for choice of tools and technologies
1. AWS S3: S3 was used for data storage as it is highly scalable and can store large volumes of data. S3 also has a connection to Redshift, which makes querying data simpler and more efficient.
2. AWS Redshift: Redshift was used to stage data and store final tables as it is optimised for handling, querying, and analysing large volumes of data, which this project has. Redshift also has a connection to other AWS services, if the user requires further data handling.
3. Pandas: Pandas was used for the data processing step as it is simple to use and has multiple functionalities that can support data manipulation.
### 5.2 How often data should be updated
All data should be updated on a monthly basis if possible, to ensure data is always up to date for the climate model. Emissions data should be updated yearly. 
### 5.3 Problem scenarios
##### Data was increased by 100x
Redshift is able to handle larger datasets, however an increase in resources might be required. Additionally, using Spark to process larger datasets is recommended.
##### Data populates a dashboard that must be updated on a daily basis by 7am every day
Apache Airflow can be used to support this requirement. Apache Airflow allows the user to determine how frequent they'd like the ETL pipeline to run, and at whatever time they determine.
##### The database needs to be accessed by 100+ people
Redshift is a sufficient enough large-scale database and can be accessed by 100+ people simultaneously. It can support multiple users querying data concurrently