# Project: A look at immigration data into the United States
### Data Engineering Capstone Project

#### Project Summary
This project aims to create an ETL pipeline using the datasets provided by Udacity.

The analytical database created could then aid analysts in identifying trends and patterns in immigration into the US. For example, they could try and identify whether there is a correlation between a cities demographic breakdown and the number of immigrants who's final address is that city or identify which type of visa is the most popular amongst US immigrants.

The project is broken down into the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import pandas as pd
import datetime as dt

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [3]:
from pyspark.sql.functions import udf,col

In [4]:
import re

In [5]:
from datetime import datetime, timedelta
from pyspark.sql import types as T

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. Data sourced from: https://www.trade.gov/national-travel-and-tourism-office
- World Temperature Data: This is a table of temperature data of cities around the World.Data sourced from: https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data
- U.S. City Demographic Data: This is a table of US cities and their demographics. Data sourced from: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/
- Airport Code Table: This is a simple table of airport codes and corresponding cities. Date sourced from: https://datahub.io/core/airport-codes#data

**I94 DATA**

*Data Dictionary*

|Column Name          |Description|
|--------------|------|
|cicid          |Unique identifier     |
|i94yr         |4 digit year of arrival    |
|i94cit         |Country of citizenship    |
|i94res        |Country of residence    |
|i94mode          |Mode of transport into the U.S.    |
|i94bir         |Age of non-immigrant     |
|i94visa      |Code for visa type     |
|i94addr         |Final address of the migrants     |
|arrdate       |Arrival date     |
|depdate         |Departure date    |
|occupation         |Occupation of immigrant |
|biryear         |4 digit year of birth |
|gender         |Non-immigrant sex |
|i94mon         |Numeric month|
|i94port         |Port of entry code |
|i94bir         |Age of Respondent in Years|
|count         |Used for summary statistics |
|dtadfile        |Character Date Field - Date added to I-94 Files |
|visapost         |Department of State where where Visa was issued |
|entdepa         |Arrival Flag - admitted or paroled into the U.S. |
|entdepd         |Departure Flag - Departed, lost I-94 or is deceased |
|entdepu         |Update Flag - Either apprehended, overstayed, adjusted to perm residence |
|matflag         |Match flag - Match of arrival and departure records |
|dtaddto         |Character Date Field - Date to which admitted to U.S. (allowed to stay until) |
|insnum        |INS number |
|airline         |Airline used to arrive in U.S. |
|admnum        |Admission Number |
|fltno         |Flight number of Airline used to arrive in U.S. |
|visatype         |Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

**WORLD TEMPERATURE DATA**

*Data Dictionary*

|Column Name          |Description|
|--------------|------|
|dt        |Date     |
|AverageTemperature         |Average Temp. (Celsius)    |
|AverageTemperatureUncertainty       |95% confidence interval around the average    |
|City        |City name     |
|Country        |Country     |
|Latitude        |City Latitude    |
|Longitude       |City Longitude   |


**U.S DEMOGRAPHICS DATA**

*Data Dictionary*

|Column Name          |Description|
|--------------|------|
|City          |City name     |
|State         |State mame    |
|Median Age         |Median age of the population    |
|Male Population        |Male population count     |
|Female Population          |Female population count     |
|Total Population         |Total population     |
|Number of Veterans       |Number of Veterans in the city     |
|Foreign-born         |Number of people who were not born in the city     |
|Average Household Size         |Average size of a house     |
|State Code         |State code     |
|Race          |Race     |
|Count         |Number of people in each Race category |



**AIRPORT CODES DATA**

*Data Dictionary*

|Column Name          |Description|
|--------------|------|
|ident          |Identifier     |
|type        |Type of airport  |
|name        |Name of airport   |
|elevation_ft        |Elevation of the airport    |
|continent         |Continent    |
|iso_country        |ISO code of the country that the airport resides in    |
|iso_region      |ISO code of the region that the airport resides in    |
|municipality         |Municipality of the airport    |
|gps_code         |GPS code   |
|iata_code        |IATA code   |
|local_code         |Local code    |
|coordinates       |Coordinates of the airport |

### Step 2: Explore and Assess the Data

#### Explore the Data
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

### U.S. Demographics Data
-------

1. Initial checks i.e. shape of df, columns, data types of columns etc.
2. Check for NaN values
3. Calculations
4. Dropping uneccessary columns
5. Renaming columns
6. Saving to new CSV

In [3]:
# Reading in the CSV
demographics_df = pd.read_csv('us-cities-demographics.csv', sep=';')

In [8]:
demographics_df.shape

(2891, 12)

In [7]:
demographics_df.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')

In [13]:
# Checking data types of each column
demographics_df.dtypes

City                       object
State                      object
Median Age                float64
Male Population           float64
Female Population         float64
Total Population            int64
Number of Veterans        float64
Foreign-born              float64
Average Household Size    float64
State Code                 object
Race                       object
Count                       int64
dtype: object

In [9]:
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [10]:
demographics_df.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


In [11]:
# Checking how many U.S. cities this table covers
demographics_df['City'].nunique()

567

The demographics dataset contains data for 567 U.S. cities.

In [12]:
# Checking how many U.S. states this table covers
demographics_df['State'].nunique()

49

The demographics dataset contains data for 49 U.S. states.

**Checking for NaN values**

In [14]:
demographics_df.isnull().values.any()

True

In [15]:
demographics_df.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

There are 5 columns in the demographics dataset that have NaN values: Male Population, Female Population, Number of veterans, Foreigh-born, Average Household Size.

Given that there are nearly 3000 records in this table, the number of NaN values looks to be relatively low.

In [16]:
# Calculating the percentage of NaN values for each column

nan_demographics_df = pd.DataFrame(data=demographics_df.isnull().sum(), columns=['NaN'])

nan_demographics_df.drop(nan_demographics_df[nan_demographics_df['NaN'] == 0].index, inplace = True)

nan_demographics_df['% of NaN'] = (nan_demographics_df['NaN']/demographics_df.count())*100
nan_demographics_df

Unnamed: 0,NaN,% of NaN
Male Population,3,0.103878
Female Population,3,0.103878
Number of Veterans,13,0.451703
Foreign-born,13,0.451703
Average Household Size,16,0.556522


The % of NaN values in each column is negligible therefore, we will not drop any rows.

**Checking for duplicate data**

When filtering the dataframe for a specific city, you can see that the data for all columns except 'Race' & 'Count' is duplicated.

We will transform the table so that each Race category is it's own column.

In [17]:
demographics_df[demographics_df['City'] == 'Silver Spring']

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
592,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,White,37756
1678,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Black or African-American,21330
2123,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,American Indian and Alaska Native,1084
2162,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Asian,8841


In [18]:
demographics_df['Race'].unique()

array(['Hispanic or Latino', 'White', 'Asian', 'Black or African-American',
       'American Indian and Alaska Native'], dtype=object)

In [19]:
demographics_df = demographics_df.set_index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race']).Count.unstack().reset_index()
demographics_df.columns.name = None
demographics_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,1813.0,2929.0,14449.0,33222.0,95487.0
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,1845.0,9033.0,66551.0,3684.0,129192.0
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,,10336.0,6577.0,34897.0,63666.0
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,1329.0,27984.0,7364.0,8265.0,44232.0
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,445.0,650.0,53440.0,1783.0,17160.0


Now, when we check the shape of the DataFrame, we can see that the duplicated rows have been removed.

In [20]:
demographics_df.shape

(596, 15)

But the number of unique Cities and States is the same as before the dataset was transformed.

In [21]:
demographics_df['City'].nunique()

567

In [22]:
demographics_df['State'].nunique()

49

In [23]:
demographics_df.duplicated().sum()

0

In [24]:
demographics_df.isna().sum()

City                                  0
State                                 0
Median Age                            0
Male Population                       1
Female Population                     1
Total Population                      0
Number of Veterans                    7
Foreign-born                          7
Average Household Size                8
State Code                            0
American Indian and Alaska Native    57
Asian                                13
Black or African-American            12
Hispanic or Latino                    0
White                                 7
dtype: int64

In [25]:
# Calculating the percentage of NaN values for each column

nan_demographics_df_2 = pd.DataFrame(data=demographics_df.isnull().sum(), columns=['NaN'])

nan_demographics_df_2.drop(nan_demographics_df_2[nan_demographics_df_2['NaN'] == 0].index, inplace = True)

nan_demographics_df_2['% of NaN'] = (nan_demographics_df_2['NaN']/demographics_df.count())*100
nan_demographics_df_2

Unnamed: 0,NaN,% of NaN
Male Population,1,0.168067
Female Population,1,0.168067
Number of Veterans,7,1.188455
Foreign-born,7,1.188455
Average Household Size,8,1.360544
American Indian and Alaska Native,57,10.575139
Asian,13,2.229846
Black or African-American,12,2.054795
White,7,1.188455


In [136]:
# Removing name of index column
# demographics_df = demographics_df.rename_axis(None, axis=1)

In [26]:
# Creating a new df
dems_df = demographics_df

In [27]:
dems_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,1813.0,2929.0,14449.0,33222.0,95487.0
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,1845.0,9033.0,66551.0,3684.0,129192.0
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,,10336.0,6577.0,34897.0,63666.0
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,1329.0,27984.0,7364.0,8265.0,44232.0
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,445.0,650.0,53440.0,1783.0,17160.0


**Calculations**

1. Finding the % breakdown of the following columns: Male Population, Female Population per City
2. Finding the % breakdwon of the following columns: American Indian and Alaska Native, Asian, Black or African-American, Hispanic or Latino and White per City
3. Finding the % of Foreign born per City

*% breakdown per Gender*

Adding two new columns: 'pct_male_pop' and 'pct_female_pop'

In [28]:
dems_df['pct_male_pop'] = (dems_df['Male Population']/dems_df['Total Population'])*100
dems_df['pct_female_pop'] = (dems_df['Female Population']/dems_df['Total Population'])*100

In [29]:
dems_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White,pct_male_pop,pct_female_pop
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,1813.0,2929.0,14449.0,33222.0,95487.0,51.80654,48.19346
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,1845.0,9033.0,66551.0,3684.0,129192.0,49.043042,50.956958
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,,10336.0,6577.0,34897.0,63666.0,46.331394,53.668606
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,1329.0,27984.0,7364.0,8265.0,44232.0,48.015621,51.984379
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,445.0,650.0,53440.0,1783.0,17160.0,44.572417,55.427583


In [65]:
# gender_dems_df = demographics_df.groupby(['State'])['Male Population', 'Female Population', 'Total Population'].sum()
# gender_dems_df

% breakdown per Race category

In [30]:
# American Indian and Alaska Native
dems_df['pct_native'] = (dems_df['American Indian and Alaska Native']/dems_df['Total Population'])*100

# Asian
dems_df['pct_asian'] = (dems_df['Asian']/dems_df['Total Population'])*100

# Black or African-American
dems_df['pct_black'] = (dems_df['Black or African-American']/dems_df['Total Population'])*100

# Hispanic or Latino
dems_df['pct_hispanic'] = (dems_df['Hispanic or Latino']/dems_df['Total Population'])*100

# White
dems_df['pct_white'] = (dems_df['White']/dems_df['Total Population'])*100

In [31]:
dems_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,...,Black or African-American,Hispanic or Latino,White,pct_male_pop,pct_female_pop,pct_native,pct_asian,pct_black,pct_hispanic,pct_white
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,...,14449.0,33222.0,95487.0,51.80654,48.19346,1.440306,2.326893,11.478757,26.39264,75.857987
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,...,66551.0,3684.0,129192.0,49.043042,50.956958,0.933927,4.572444,33.687669,1.864816,65.396122
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,...,6577.0,34897.0,63666.0,46.331394,53.668606,,12.122349,7.713689,40.928176,74.669263
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,...,7364.0,8265.0,44232.0,48.015621,51.984379,1.690539,35.596713,9.367288,10.513395,56.264787
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,...,53440.0,1783.0,17160.0,44.572417,55.427583,0.6258,0.91409,75.152231,2.507418,24.131966


% breakdown of Foreign Born

In [32]:
dems_df['pct_foreign_born'] = (dems_df['Foreign-born']/dems_df['Total Population'])*100

% breakdown of Veterans

In [33]:
dems_df['pct_veterans'] = (dems_df['Number of Veterans']/dems_df['Total Population'])*100

In [34]:
dems_df.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,...,White,pct_male_pop,pct_female_pop,pct_native,pct_asian,pct_black,pct_hispanic,pct_white,pct_foreign_born,pct_veterans
0,Abilene,Texas,31.3,65212.0,60664.0,125876,9367.0,8129.0,2.64,TX,...,95487.0,51.80654,48.19346,1.440306,2.326893,11.478757,26.39264,75.857987,6.457943,7.44145
1,Akron,Ohio,38.1,96886.0,100667.0,197553,12878.0,10024.0,2.24,OH,...,129192.0,49.043042,50.956958,0.933927,4.572444,33.687669,1.864816,65.396122,5.074081,6.518757
2,Alafaya,Florida,33.5,39504.0,45760.0,85264,4176.0,15842.0,2.94,FL,...,63666.0,46.331394,53.668606,,12.122349,7.713689,40.928176,74.669263,18.57994,4.897729
3,Alameda,California,41.4,37747.0,40867.0,78614,4504.0,18841.0,2.52,CA,...,44232.0,48.015621,51.984379,1.690539,35.596713,9.367288,10.513395,56.264787,23.966469,5.729259
4,Albany,Georgia,33.3,31695.0,39414.0,71109,5409.0,861.0,2.38,GA,...,17160.0,44.572417,55.427583,0.6258,0.91409,75.152231,2.507418,24.131966,1.210817,7.606632


In [35]:
dems_df.columns

Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code',
       'American Indian and Alaska Native', 'Asian',
       'Black or African-American', 'Hispanic or Latino', 'White',
       'pct_male_pop', 'pct_female_pop', 'pct_native', 'pct_asian',
       'pct_black', 'pct_hispanic', 'pct_white', 'pct_foreign_born',
       'pct_veterans'],
      dtype='object')

Dropping Columns

In [36]:
dems_df = dems_df.drop(['Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'American Indian and Alaska Native', 'Asian',
       'Black or African-American', 'Hispanic or Latino', 'White',], axis=1)
dems_df.columns

Index(['City', 'State', 'Median Age', 'Total Population',
       'Average Household Size', 'State Code', 'pct_male_pop',
       'pct_female_pop', 'pct_native', 'pct_asian', 'pct_black',
       'pct_hispanic', 'pct_white', 'pct_foreign_born', 'pct_veterans'],
      dtype='object')

Renaming columns

In [37]:
dems_df.rename(columns={'City': 'city', 'State': 'state', 'Median Age': 'median_age', 'Total Population': 'tot_pop', 'Average Household Size': 'avg_house_size'}, inplace=True)
dems_df.columns

Index(['city', 'state', 'median_age', 'tot_pop', 'avg_house_size',
       'State Code', 'pct_male_pop', 'pct_female_pop', 'pct_native',
       'pct_asian', 'pct_black', 'pct_hispanic', 'pct_white',
       'pct_foreign_born', 'pct_veterans'],
      dtype='object')

In [47]:
dems_df.rename(columns={'State Code': 'state_code'}, inplace=True)
dems_df.columns

Index(['city', 'state', 'median_age', 'tot_pop', 'avg_house_size',
       'state_code', 'pct_male_pop', 'pct_female_pop', 'pct_native',
       'pct_asian', 'pct_black', 'pct_hispanic', 'pct_white',
       'pct_foreign_born', 'pct_veterans'],
      dtype='object')

In [48]:
# Saving the clean df to a new csv
dems_df.to_csv('demographics_data_clean.csv', index=False)

### Immigration dataset
-----

1. Check the sample dataset
2. Read in data for Apr 2016
3. Converting data types of certain columns
4. Dropping columns
5. Joining to i94_port.csv Spark dataframe
6. Saving to parquet file

N.B. For exploratory purposes only, a small subset of the Immigration dataset will be used below for initial checks.

In [62]:
# Reading in the immigration data subset into a pandas df

immigration_df = pd.read_csv('immigration_data_sample.csv')

In [63]:
immigration_df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [64]:
immigration_df.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [65]:
immigration_df.shape

(1000, 29)

**Reading in the whole Immigration dataset** 

We will be using Spark to work with the large Immigration dataset.

Spark has a library which supports the format that the Immigration dataset is in - .sas7bdat.


*Reading in the Immigration data for April 2016 into Spark*

In [66]:
# Referenced from the following knowledge thread: https://knowledge.udacity.com/questions/437987
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df_spark = spark.read.format('com.github.saurfang.sas.spark').load(fname)

In [67]:
immigration_df_spark.count()

3096313

In [68]:
immigration_df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [69]:
immigration_df_spark.show(2)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SEO| n

**Converting arrdate and depdate**

In [91]:
# Referenced from the following knowledge thread: https://knowledge.udacity.com/questions/66798
# from datetime import datetime, timedelta
# from pyspark.sql import types as T

def convert_to_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
    
convert_date = udf(lambda x: convert_to_datetime(x), T.DateType())

imm_df_spark = immigration_df_spark \
    .withColumn("arrival_date", convert_date(immigration_df_spark.arrdate)) \
    .withColumn("departure_date", convert_date(immigration_df_spark.depdate))

In [92]:
imm_df_spark.show(2)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|       admnum|fltno|visatype|arrival_date|departure_date|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+-------------+-----+--------+------------+--------------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null|1.897628485E9| null|      B2|  2016-04-29|          null|
|  7

**Dropping columns**

In [96]:
drop_list = ['dtadfile', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'dtaddto', 'insnum', 'fltno', 'arrdate', 'depdate', 'count']
imm_df_spark = imm_df_spark.drop(*drop_list)

In [97]:
imm_df_spark.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94port',
 'i94mode',
 'i94addr',
 'i94bir',
 'i94visa',
 'visapost',
 'occup',
 'biryear',
 'gender',
 'airline',
 'admnum',
 'visatype',
 'arrival_date',
 'departure_date']

In [103]:
# Write to parquet
imm_df_spark.write.parquet("immigration_data")

In [33]:
# Read parquet
imm_df=spark.read.parquet("immigration_data")

In [34]:
imm_df.count()

3096313

In [35]:
imm_df.show(2)

+---------+------+------+------+------+-------+-------+-------+------+-------+--------+-----+-------+------+-------+--------------+--------+------------+--------------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|i94mode|i94addr|i94bir|i94visa|visapost|occup|biryear|gender|airline|        admnum|visatype|arrival_date|departure_date|
+---------+------+------+------+------+-------+-------+-------+------+-------+--------+-----+-------+------+-------+--------------+--------+------------+--------------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|    1.0|     CA|  40.0|    1.0|     SYD| null| 1976.0|     F|     QF|9.495387003E10|      B1|  2016-04-30|    2016-05-08|
|5748518.0|2016.0|   4.0| 245.0| 438.0|    LOS|    1.0|     NV|  32.0|    1.0|     SYD| null| 1984.0|     F|     VA|9.495562283E10|      B1|  2016-04-30|    2016-05-17|
+---------+------+------+------+------+-------+-------+-------+------+-------+--------+-----+-------+------+-------+--------------+--------+------------+--

**Adding port_city and port_state columns to immigration df**

In [36]:
i94_port_df = pd.read_csv('i94_port.csv')

In [37]:
i94_port_df.head()

Unnamed: 0,port_code,port_city,port_state
0,ALC,Alcan,AK
1,ANC,Anchorage,AK
2,BAR,Baker Aaf - Baker Island,AK
3,DAC,Daltons Cache,AK
4,PIZ,Dew Station Pt Lay Dew,AK


In [38]:
i94_port_spark = spark.createDataFrame(i94_port_df)

In [39]:
i94_port_spark.printSchema()

root
 |-- port_code: string (nullable = true)
 |-- port_city: string (nullable = true)
 |-- port_state: string (nullable = true)



In [40]:
i94_port_spark.show(2)

+---------+---------+----------+
|port_code|port_city|port_state|
+---------+---------+----------+
|      ALC|    Alcan|        AK|
|      ANC|Anchorage|        AK|
+---------+---------+----------+
only showing top 2 rows



In [41]:
imm_df = imm_df.join(i94_port_spark, imm_df.i94port==i94_port_spark.port_code, how='left')

In [42]:
imm_df.show(2)

+---------+------+------+------+------+-------+-------+-------+------+-------+--------+-----+-------+------+-------+--------------+--------+------------+--------------+---------+---------+----------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|i94mode|i94addr|i94bir|i94visa|visapost|occup|biryear|gender|airline|        admnum|visatype|arrival_date|departure_date|port_code|port_city|port_state|
+---------+------+------+------+------+-------+-------+-------+------+-------+--------+-----+-------+------+-------+--------------+--------+------------+--------------+---------+---------+----------+
|5761355.0|2016.0|   4.0| 297.0| 297.0|    BGM|    1.0|   null|  63.0|    1.0|     DOH| null| 1953.0|     F|    348|9.499065963E10|      B1|  2016-04-30|          null|      BGM|   Bangor|        ME|
|5761356.0|2016.0|   4.0| 297.0| 297.0|    BGM|    1.0|   null|  43.0|    1.0|     DOH| null| 1973.0|     M|    348|9.499028833E10|      B1|  2016-04-30|    2016-08-08|      BGM|   Bangor|        ME|


In [43]:
imm_df.count()

3096313

In [45]:
imm_df = imm_df.withColumnRenamed("i94addr", "us_arrival_state")

In [46]:
imm_df.show(2)

+---------+------+------+------+------+-------+-------+----------------+------+-------+--------+-----+-------+------+-------+--------------+--------+------------+--------------+---------+---------+----------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|i94mode|us_arrival_state|i94bir|i94visa|visapost|occup|biryear|gender|airline|        admnum|visatype|arrival_date|departure_date|port_code|port_city|port_state|
+---------+------+------+------+------+-------+-------+----------------+------+-------+--------+-----+-------+------+-------+--------------+--------+------------+--------------+---------+---------+----------+
|5761355.0|2016.0|   4.0| 297.0| 297.0|    BGM|    1.0|            null|  63.0|    1.0|     DOH| null| 1953.0|     F|    348|9.499065963E10|      B1|  2016-04-30|          null|      BGM|   Bangor|        ME|
|5761356.0|2016.0|   4.0| 297.0| 297.0|    BGM|    1.0|            null|  43.0|    1.0|     DOH| null| 1973.0|     M|    348|9.499028833E10|      B1|  2016-04-30|  

**Dropping columns**

In [47]:
drop_list = ['i94port']
imm_df = imm_df.drop(*drop_list)

In [48]:
imm_df.columns

['cicid',
 'i94yr',
 'i94mon',
 'i94cit',
 'i94res',
 'i94mode',
 'us_arrival_state',
 'i94bir',
 'i94visa',
 'visapost',
 'occup',
 'biryear',
 'gender',
 'airline',
 'admnum',
 'visatype',
 'arrival_date',
 'departure_date',
 'port_code',
 'port_city',
 'port_state']

In [49]:
imm_df.count()

3096313

In [52]:
# Write to parquet
imm_df.write.parquet("immig_data")

### Airport Codes Data
----------

1. Initial checks i.e. shape of df, columns, data types of columns etc.
2. Filtering for U.S. airports only
3. Check for NaN values
4. Calculations
5. Dropping uneccessary columns
6. Renaming columns
7. Saving to new CSV

In [39]:
airport_codes_df = pd.read_csv('airport-codes_csv.csv')

In [40]:
airport_codes_df.shape

(55075, 12)

In [41]:
airport_codes_df.columns

Index(['ident', 'type', 'name', 'elevation_ft', 'continent', 'iso_country',
       'iso_region', 'municipality', 'gps_code', 'iata_code', 'local_code',
       'coordinates'],
      dtype='object')

In [42]:
airport_codes_df.dtypes

ident            object
type             object
name             object
elevation_ft    float64
continent        object
iso_country      object
iso_region       object
municipality     object
gps_code         object
iata_code        object
local_code       object
coordinates      object
dtype: object

In [43]:
airport_codes_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [44]:
airport_codes_df['iso_country'].nunique()

243

This dataset contains data for multiple countries. 

We will be dropping data for any country that is not the U.S. - this is because the Immigration dataset we will be using refers to immigration into the U.S. only.

In [45]:
# Filtering the dataset so we only have airport data for the U.S.

us_airports_df = airport_codes_df[airport_codes_df['iso_country'] == 'US']

In [46]:
us_airports_df.shape

(22757, 12)

Filtering the dataset for 'iso_country' == 'US' has reduced the number of rows by approximately half.

**Checking for NaN Values**

In [47]:
us_airports_df.isnull().values.any()

True

In [48]:
us_airports_df.isnull().sum()

ident               0
type                0
name                0
elevation_ft      239
continent       22756
iso_country         0
iso_region          0
municipality      102
gps_code         1773
iata_code       20738
local_code       1521
coordinates         0
dtype: int64

In [49]:
us_airports_df.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


**Checking for duplicates**

In [51]:
us_airports_df.duplicated().sum()

0

**Dropping rows and columns**

In [52]:
us_airports_df = us_airports_df.drop(['elevation_ft', 'continent', 'gps_code', 'local_code', 'coordinates'], axis=1)

In [53]:
us_airports_df.columns

Index(['ident', 'type', 'name', 'iso_country', 'iso_region', 'municipality',
       'iata_code'],
      dtype='object')

In [54]:
us_airports_df.head()

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,iata_code
0,00A,heliport,Total Rf Heliport,US,US-PA,Bensalem,
1,00AA,small_airport,Aero B Ranch Airport,US,US-KS,Leoti,
2,00AK,small_airport,Lowell Field,US,US-AK,Anchor Point,
3,00AL,small_airport,Epps Airpark,US,US-AL,Harvest,
4,00AR,closed,Newport Hospital & Clinic Heliport,US,US-AR,Newport,


It looks like the 'municipality' columns corresponds to a city in the US. Therefore, we will likely use this column to join the Airports table to other tables in the database.

We will look at how many NaN values there are for the municipality column.

In [58]:
us_airports_df.isnull().sum()

ident               0
type                0
name                0
iso_country         0
iso_region          0
municipality      102
iata_code       20738
dtype: int64

In [57]:
us_airports_df.shape

(22757, 7)

There are only 102 missing values for municipality out of 22757. Therefore, we will drop any rows where municipality has a NaN value

In [59]:
us_airport_df_final = us_airports_df.dropna(subset=["municipality"])

In [61]:
us_airport_df_final.shape

(22655, 7)

**Renaming columns**

In [84]:
us_airport_df_final.rename(columns={'type': 'airport_type', 'name': 'airport_name'}, inplace=True)
us_airport_df_final.columns

Index(['ident', 'airport_type', 'airport_name', 'iso_country', 'iso_region',
       'municipality', 'iata_code'],
      dtype='object')

**Extracting the state code from the iso_region field**

In [64]:
us_airport_df_final.head()

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,iata_code
0,00A,heliport,Total Rf Heliport,US,US-PA,Bensalem,
1,00AA,small_airport,Aero B Ranch Airport,US,US-KS,Leoti,
2,00AK,small_airport,Lowell Field,US,US-AK,Anchor Point,
3,00AL,small_airport,Epps Airpark,US,US-AL,Harvest,
4,00AR,closed,Newport Hospital & Clinic Heliport,US,US-AR,Newport,


In [83]:
us_airport_df_final['state_code'] = us_airport_df_final['iso_region'].str.split("-").str[-1]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.


In [84]:
us_airport_df_final.head()

Unnamed: 0,ident,type,name,iso_country,iso_region,municipality,iata_code,state_code
0,00A,heliport,Total Rf Heliport,US,US-PA,Bensalem,,PA
1,00AA,small_airport,Aero B Ranch Airport,US,US-KS,Leoti,,KS
2,00AK,small_airport,Lowell Field,US,US-AK,Anchor Point,,AK
3,00AL,small_airport,Epps Airpark,US,US-AL,Harvest,,AL
4,00AR,closed,Newport Hospital & Clinic Heliport,US,US-AR,Newport,,AR


Let's check if there are any NaN values in the new 'state_code' column.

In [86]:
us_airport_df_final.isnull().sum()

ident               0
type                0
name                0
iso_country         0
iso_region          0
municipality        0
iata_code       20642
state_code          0
dtype: int64

In [87]:
# Saving to CSV
us_airport_df_final.to_csv("us_airports_clean.csv", index=False)

### Temperature Data
-------

1. Check dataset
2. Convert datatype of the date column to a datetime object
3. Drop rows with NaN values and check for duplicate rows
4. Drop unecessary columns
5. Filter for U.S data only
6. Rename columns
7. Add port code from the SAS text file

In [3]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
temp_df = pd.read_csv(fname)

In [4]:
temp_df.shape

(8599212, 7)

In [5]:
temp_df.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
dtype: object

In [6]:
temp_df.columns

Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude'],
      dtype='object')

In [7]:
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


**Converting date to a datetime object**

In [8]:
temp_df['dt'] = pd.to_datetime(temp_df['dt'])

In [9]:
temp_df['year'] = temp_df['dt'].dt.year

In [10]:
temp_df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E,1743
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E,1743
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E,1744
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E,1744
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E,1744


In [11]:
temp_df['year'].max()

2013

In [12]:
temp_df['year'].min()

1743

In [19]:
temp_df['City'].nunique()

3448

The temperature dataset contains data for 3448 cities.

In [20]:
temp_df['Country'].nunique()

159

The temperature dataset contains data for 159 countries.

**Checking for NaN Values**

In [13]:
temp_df.isnull().values.any()

True

In [14]:
temp_df.isnull().sum()

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
year                                  0
dtype: int64

Only two columns have NaN values: AverageTemperature and AverageTemperatureUncertainty. They also have the exact same number of NaN values.

In [15]:
# Calculating the percentage of NaN values for each column

nan_temp_df = pd.DataFrame(data=temp_df.isnull().sum(), columns=['NaN'])

nan_temp_df.drop(nan_temp_df[nan_temp_df['NaN'] == 0].index, inplace = True)

nan_temp_df['% of NaN'] = (nan_temp_df['NaN']/temp_df.count())*100
nan_temp_df

Unnamed: 0,NaN,% of NaN
AverageTemperature,364130,4.421692
AverageTemperatureUncertainty,364130,4.421692


In [16]:
temp_df[temp_df['AverageTemperature'].isnull()]

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,year
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E,1743
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E,1744
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E,1744
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E,1744
9,1744-08-01,,,Århus,Denmark,57.05N,10.33E,1744
18,1745-05-01,,,Århus,Denmark,57.05N,10.33E,1745
19,1745-06-01,,,Århus,Denmark,57.05N,10.33E,1745
20,1745-07-01,,,Århus,Denmark,57.05N,10.33E,1745
21,1745-08-01,,,Århus,Denmark,57.05N,10.33E,1745
22,1745-09-01,,,Århus,Denmark,57.05N,10.33E,1745


We can go with the assumption that if a row has a NaN value in the 'AverageTemperature' column then it will also have a NaN value in the 'AverageTemperatureUncertainty'. So, we will drop all rows with NaN values.

In [16]:
temp_df.dropna(axis=0, inplace=True)

In [17]:
temp_df.shape

(8235082, 8)

In [21]:
temp_df.isna().sum()

dt                               0
AverageTemperature               0
AverageTemperatureUncertainty    0
City                             0
Country                          0
Latitude                         0
Longitude                        0
year                             0
dtype: int64

**Checking for duplicates**

In [22]:
temp_df.duplicated().sum()

0

There are no duplicate rows in this dataset.

*For this project, we will be focusing solely on the U.S therefore we will filter the dataset to only include data for U.S. cities.*

In [23]:
temp_df = temp_df[temp_df['Country'] == 'United States']

In [24]:
temp_df.shape

(661524, 8)

**Renaming columns**

In [25]:
temp_df.rename(columns={'dt': 'date'}, inplace=True)
temp_df.columns

Index(['date', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City',
       'Country', 'Latitude', 'Longitude', 'year'],
      dtype='object')

In [26]:
temp_df.rename(columns={'AverageTemperature': 'avg_temp', 'City': 'city', 'Country': 'country'}, inplace=True)
temp_df.columns

Index(['date', 'avg_temp', 'AverageTemperatureUncertainty', 'city', 'country',
       'Latitude', 'Longitude', 'year'],
      dtype='object')

**Dropping columns**

In [28]:
# Dropping 3 columns

temp_df = temp_df.drop(['AverageTemperatureUncertainty', 'Latitude', 'Longitude'], axis=1)
temp_df.columns

Index(['date', 'avg_temp', 'city', 'country', 'year'], dtype='object')

In [30]:
temp_df.head()

Unnamed: 0,date,avg_temp,city,country,year
47555,1820-01-01,2.101,Abilene,United States,1820
47556,1820-02-01,6.926,Abilene,United States,1820
47557,1820-03-01,10.767,Abilene,United States,1820
47558,1820-04-01,17.989,Abilene,United States,1820
47559,1820-05-01,21.809,Abilene,United States,1820


In [31]:
# Saving the clean df to a new csv
temp_df.to_csv('temp_data_clean.csv', index=False)

### Extracting the required information from the I94_SAS_Labels_Descriptions.SAS file
----

In [32]:
# Referenced from the following knowledge thread: https://knowledge.udacity.com/questions/125439
# Referenced from the following knowledge thread: https://knowledge.udacity.com/questions/801811

with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')
    
def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

i94_cit_and_res = code_mapper(f_content, "i94cntyl")
i94_port = code_mapper(f_content, "i94prtl")
i94_mode = code_mapper(f_content, "i94model")
i94_addr = code_mapper(f_content, "i94addrl")
i94_visa = {'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}

**Move dictionaries into pandas DF and then saving them as CSV**

*i94_port data*

In [51]:
i94_port_df = pd.DataFrame.from_dict(i94_port, orient='index', columns=['port_city']).reset_index()

In [52]:
i94_port_df.head()

Unnamed: 0,index,port_city
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [53]:
i94_port_df['port_state'] = i94_port_df['port_city'].str.split(",").str[-1]

In [55]:
i94_port_df['port_city'] = i94_port_df['port_city'].str.split(",").str[0]

In [57]:
i94_port_df['port_city'] = i94_port_df['port_city'].str.title()

In [59]:
i94_port_df.rename(columns={'index': 'port_code'}, inplace=True)
i94_port_df.columns

Index(['port_code', 'port_city', 'port_state'], dtype='object')

In [60]:
i94_port_df.head()

Unnamed: 0,port_code,port_city,port_state
0,ALC,Alcan,AK
1,ANC,Anchorage,AK
2,BAR,Baker Aaf - Baker Island,AK
3,DAC,Daltons Cache,AK
4,PIZ,Dew Station Pt Lay Dew,AK


In [61]:
i94_port_df.shape

(660, 3)

In [62]:
i94_port_df['port_city'].nunique()

634

There are 634 cities in this table (some cities in the US have the same name which accounts for the difference between the value of records shown in the .shape action).

In [63]:
i94_port_df['port_state'].nunique()

178

There are 178 states in this table.

In [64]:
i94_port_df.duplicated().sum()

0

In [65]:
# Saving to CSV
i94_port_df.to_csv("i94_port.csv", index=False)

*i94_visa data*

In [70]:
i94_visa

{'1': 'Business', '2': 'Pleasure', '3': 'Student'}

In [71]:
i94_visa_df = pd.DataFrame.from_dict(i94_visa, orient='index', columns=['visa_type']).reset_index()

In [74]:
i94_visa_df.rename(columns={'index': 'i94_visa_code'}, inplace=True)
i94_visa_df.columns

Index(['i94_visa_code', 'visa_type'], dtype='object')

In [75]:
i94_visa_df.head()

Unnamed: 0,i94_visa_code,visa_type
0,1,Business
1,2,Pleasure
2,3,Student


In [76]:
# Saving to CSV
i94_visa_df.to_csv("i94_visa.csv", index=False)

*i94_cit_and_res*

In [81]:
i94_cit_and_res_df = pd.DataFrame.from_dict(i94_cit_and_res, orient='index', columns=['country']).reset_index()

In [83]:
i94_cit_and_res_df.rename(columns={'index': 'i94_cit_res_code'}, inplace=True)
i94_cit_and_res_df.columns

Index(['i94_cit_res_code', 'country'], dtype='object')

In [85]:
i94_cit_and_res_df['country'] = i94_cit_and_res_df['country'].str.title()

In [86]:
i94_cit_and_res_df.head()

Unnamed: 0,i94_cit_res_code,country
0,582,"Mexico Air Sea, And Not Reported (I-94, No Lan..."
1,236,Afghanistan
2,101,Albania
3,316,Algeria
4,102,Andorra


In [109]:
i94_cit_and_res_df['country'].nunique()

287

In [110]:
i94_cit_and_res_df.shape

(289, 2)

In [88]:
i94_cit_and_res_df.to_csv('i94_cit_and_res.csv', index=False)

*i94_mode*

In [89]:
i94_mode

{'1': 'Air', '2': 'Sea', '3': 'Land', '9': 'Not reported'}

In [92]:
i94_mode_df = pd.DataFrame.from_dict(i94_mode, orient='index', columns=['mode']).reset_index()

In [93]:
i94_mode_df.rename(columns={'index': 'i94_mode_code'}, inplace=True)
i94_mode_df.columns

Index(['i94_mode_code', 'mode'], dtype='object')

In [94]:
i94_mode_df.head()

Unnamed: 0,i94_mode_code,mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [95]:
i94_mode_df.to_csv('i94_mode.csv', index=False)

*i94_addr*

In [101]:
i94_addr_df = pd.DataFrame.from_dict(i94_addr, orient='index', columns=['state']).reset_index()

In [102]:
i94_addr_df.rename(columns={'index': 'i94_addr_code'}, inplace=True)
i94_addr_df.columns

Index(['i94_addr_code', 'state'], dtype='object')

In [104]:
i94_addr_df['state'] = i94_addr_df['state'].str.title()

In [105]:
i94_addr_df.head()

Unnamed: 0,i94_addr_code,state
0,AL,Alabama
1,AK,Alaska
2,AZ,Arizona
3,AR,Arkansas
4,CA,California


In [108]:
i94_addr_df.shape

(55, 2)

In [107]:
i94_addr_df['state'].nunique()

55

In [106]:
i94_addr_df.to_csv('i94_addr.csv', index=False)

**Creating a CSV with cities, states and state_codes**

In [51]:
city_states_df = dems_df[['city', 'state', 'state_code']]

In [60]:
city_states_df.head()

Unnamed: 0,city,state,state_code
0,Abilene,Texas,TX
1,Akron,Ohio,OH
2,Alafaya,Florida,FL
3,Alameda,California,CA
4,Albany,Georgia,GA


In [56]:
city_states_df['city'].nunique()

567

In [57]:
city_states_df['state'].nunique()

49

In [58]:
city_states_df['state_code'].nunique()

49

In [61]:
city_states_df.to_csv('city_states_codes.csv', index=False)

### Adding 'state' and 'state_code' to Temperature dataset
-----

In [104]:
temp_df = pd.read_csv('temp_data_clean.csv')

In [105]:
temp_df.shape

(661524, 5)

In [106]:
temp_df['city'].nunique()

248

In [112]:
temp_df.isnull().sum()

date        0
avg_temp    0
city        0
country     0
year        0
dtype: int64

In [107]:
city_states_df.head()

Unnamed: 0,city,state,state_code
0,Abilene,Texas,TX
1,Akron,Ohio,OH
2,Alafaya,Florida,FL
3,Alameda,California,CA
4,Albany,Georgia,GA


In [108]:
city_states_df['city'].nunique()

567

In [109]:
temp_df_final = pd.merge(temp_df, city_states_df, how='left', left_on ='city', right_on='city')

In [122]:
temp_df_final.to_csv('temp_df_final.csv', index=False)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model:

The data model for this project is a Star Schema.

I chose a Star Schema because it allows for simple queries which will be more efficient for the individuals running the analysis on the data.

This data model includes one Fact table and four Dimension tables.

**Fact Table:**
- fact_immigration

**Dimension Tables:**
- dim_visa
- dim_demographics
- dim_airports
- dim_temp

**fact_immigration** - this table will contain data from the i94 dataset and will include the following columns:
- cicid
- i94mode
- i94yr
- us_arrival_state
- port_state
- port_city
- port_state
- i94bir
- i94_cit
- i94_res
- i94_visa_code
- arr_date
- dep_date
- bir_year
- age
- gender
- occupation
- airline


**dim_visa** - this table will contain visa information from the i94 dataset and will include the following columns:
- cicid
- i94_visa
- visa_type

**dim_airports** - this table will contain information about US airports and will include the following columns:
- ident
- type
- name
- iso_country
- iso_region
- municipality
- state_code

**dim_demographics** - this table will contain information about the demographic breakdown of various US cities and will include the following columns: 
- city 
- state
- median_age
- tot_pop
- avg_house_size
- state_code
- pct_male_pop
- pct_female_pop
- pct_native
- pct_asian
- pct_black
- pct_hispanic
- pct_white
- pct_foreign_born
- pct_veterans

**dim_temp** - this table will contain information about the temperatures of various US cities over several years and will include the following columns:
- date
- avg_temp
- city
- country
- year
- state
- state_code

#### 3.2 Mapping Out Data Pipelines

Steps taken to map out the data pipelines:
1. Extract and clean data (done in Section 2)
2. Create Dimension tables
3. Create Fact table
4. Save the Dimension and Fact tables to parquet files
5. Perform data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [7]:
# Reading in the file for the Fact Table as a spark df
immigration_df = spark.read.parquet("immig_data")

In [8]:
# Reading in the files for the Dimension Tables as spark DFs
demographics_df = spark.read.csv('demographics_data_clean.csv', header=True)
airports_df = spark.read.csv('us_airports_clean.csv', header=True)
temps_df = spark.read.csv('temp_df_final.csv', header=True)

In [9]:
visas_df = spark.read.csv('i94_visa.csv', header=True)

**Creating temporary views for all tables**

In [11]:
immigration_df.createOrReplaceTempView("immigration")

In [12]:
demographics_df.createOrReplaceTempView("demographics")

In [13]:
airports_df.createOrReplaceTempView("airports")

In [14]:
temps_df.createOrReplaceTempView("temperatures")

In [15]:
visas_df.createOrReplaceTempView("visas")

**Creating the Dimension table: dim_visa**

In [74]:
dim_visa = spark.sql('''
SELECT im.cicid, v.i94_visa_code, v.visa_type 
FROM immigration im
JOIN visas v
ON im.i94visa = v.i94_visa_code
WHERE im.i94mode = 1
''')

In [75]:
dim_visa.show(5)

+---------+-------------+---------+
|    cicid|i94_visa_code|visa_type|
+---------+-------------+---------+
|4858657.0|            2| Pleasure|
|5748886.0|            2| Pleasure|
|5748902.0|            2| Pleasure|
|5748921.0|            2| Pleasure|
|5748922.0|            2| Pleasure|
+---------+-------------+---------+
only showing top 5 rows



In [79]:
dim_visa.count()

2994505

**Creating the Dimension table: dim_demographics**

In [80]:
dim_demographics = spark.sql('''
SELECT *
FROM demographics
''')

In [81]:
dim_demographics.count()

596

In [82]:
dim_demographics.show(5)

+-------+----------+----------+-------+--------------+----------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|   city|     state|median_age|tot_pop|avg_house_size|state_code|      pct_male_pop|    pct_female_pop|        pct_native|         pct_asian|         pct_black|      pct_hispanic|        pct_white|  pct_foreign_born|      pct_veterans|
+-------+----------+----------+-------+--------------+----------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+
|Abilene|     Texas|      31.3| 125876|          2.64|        TX| 51.80653976929677| 48.19346023070323|1.4403063332168164|2.3268931329244653|11.478756871842132|26.392640376243293|75.85798722552353| 6.457942737297023|7.4414503161841825|
|  Akron|      Ohio|      38.1| 197553|          2.24|  

**Creating the Dimension table: dim_airports**

In [86]:
dim_airports = spark.sql('''
SELECT ident, name, municipality, state_code, type, iso_country, iso_region
FROM airports
''')

In [87]:
dim_airports.count()

22655

In [88]:
dim_airports.show(5)

+-----+--------------------+------------+----------+-------------+-----------+----------+
|ident|                name|municipality|state_code|         type|iso_country|iso_region|
+-----+--------------------+------------+----------+-------------+-----------+----------+
|  00A|   Total Rf Heliport|    Bensalem|        PA|     heliport|         US|     US-PA|
| 00AA|Aero B Ranch Airport|       Leoti|        KS|small_airport|         US|     US-KS|
| 00AK|        Lowell Field|Anchor Point|        AK|small_airport|         US|     US-AK|
| 00AL|        Epps Airpark|     Harvest|        AL|small_airport|         US|     US-AL|
| 00AR|Newport Hospital ...|     Newport|        AR|       closed|         US|     US-AR|
+-----+--------------------+------------+----------+-------------+-----------+----------+
only showing top 5 rows



**Creating the Dimension table: dim_temp**

In [89]:
dim_temp = spark.sql('''
SELECT *
FROM temperatures
''')

In [90]:
dim_temp.count()

750304

In [91]:
dim_temp.show(5)

+----------+------------------+-------+-------------+----+-----+----------+
|      date|          avg_temp|   city|      country|year|state|state_code|
+----------+------------------+-------+-------------+----+-----+----------+
|1820-01-01|2.1010000000000004|Abilene|United States|1820|Texas|        TX|
|1820-02-01|             6.926|Abilene|United States|1820|Texas|        TX|
|1820-03-01|            10.767|Abilene|United States|1820|Texas|        TX|
|1820-04-01|17.988999999999994|Abilene|United States|1820|Texas|        TX|
|1820-05-01|            21.809|Abilene|United States|1820|Texas|        TX|
+----------+------------------+-------+-------------+----+-----+----------+
only showing top 5 rows



**Creating the Fact table: fact_immigration**

In [94]:
# Filtering the immigration data to only select arrivals into the US by air
fact_immigration = spark.sql('''
SELECT cicid, i94yr, i94cit, i94res, i94mode, i94bir, i94visa AS i94_visa_code, us_arrival_state, arrival_date, departure_date, port_code, port_city, port_state, occup AS occupation, biryear AS birth_year, gender 
FROM immigration
WHERE i94mode = 1
''')

In [95]:
# Checking that the table was created correctly by checking the count of records
fact_immigration.count()

2994505

**Saving the fact and dimension tables**

In [97]:
# Writing the fact and dimension tables to parquet
fact_immigration.write.mode('overwrite').parquet('fact_immigration')
dim_demographics.write.mode('overwrite').parquet('dim_demographics')
dim_airports.write.mode('overwrite').parquet('dim_airports')
dim_temp.write.mode('overwrite').parquet('dim_temp')
dim_visa.write.mode('overwrite').parquet('dim_visa')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected.
 
**Run Quality Checks**

In [6]:
# Reading in the the parquet files
fact_immigration = spark.read.parquet('fact_immigration')
dim_demographics = spark.read.parquet('dim_demographics')
dim_airports = spark.read.parquet('dim_airports')
dim_temp = spark.read.parquet('dim_temp')
dim_visa = spark.read.parquet('dim_visa')

**1. Checking that the fact and dimension tables exist and that data has been inserted into all the tables**

In [33]:
def check_count(table):
    '''This function checks that there are more than 0 records in a table.'''
    
    if table.count() > 0:
        print('Data has been successfully inserted into the table.')
    else:
        print('Data quality check failed. Data has NOT been inserted into the table.')

In [34]:
check_count(fact_immigration)
check_count(dim_demographics)
check_count(dim_airports)
check_count(dim_temp)
check_count(dim_visa)

Data has been successfully inserted into the table.
Data has been successfully inserted into the table.
Data has been successfully inserted into the table.
Data has been successfully inserted into the table.
Data has been successfully inserted into the table.


**2. Checking that the number of records in each table is correct**

In [36]:
def check_no_records(csv, table):
    
    df = pd.read_csv(csv)
    
    if table.count() == df.shape[0]:
        print('Data quality check passed.')
    else:
        print('Data quality check failed.')

In [37]:
check_no_records('temp_df_final.csv', dim_temp)
check_no_records('demographics_data_clean.csv', dim_demographics)
check_no_records('us_airports_clean.csv', dim_airports)

Data quality check passed.
Data quality check passed.
Data quality check passed.


In [38]:
# For the visa and immigration tables we will check that the number of records of both tables is the same
if dim_visa.count() == fact_immigration.count():
    print('Data quality check passed.')
else:
    print('Data quality check failed.')

Data quality check passed.


**3. Group By sql queries to check if the data returned can be useful**

In [7]:
fact_immigration.createOrReplaceTempView("fact_immigration")
dim_visa.createOrReplaceTempView("dim_visa")
dim_demographics.createOrReplaceTempView("dim_demographics")
dim_airports.createOrReplaceTempView("dim_airports")
dim_temp.createOrReplaceTempView("dim_temp")

- Example query: What is the average temperature per year per state?

In [15]:
# Group by year, state and state_code
# Filter for Texas
test_query_one = spark.sql('''
SELECT state, state_code, year, AVG(avg_temp) AS average_tempearture
FROM dim_temp
WHERE state = 'Texas'
GROUP BY state, state_code, year
ORDER BY year
''')

test_query_one.show(10)

+-----+----------+----+-------------------+
|state|state_code|year|average_tempearture|
+-----+----------+----+-------------------+
|Texas|        TX|1743|              5.339|
|Texas|        TX|1744| 13.880749999999999|
|Texas|        TX|1745|            3.99625|
|Texas|        TX|1750|  12.50590909090909|
|Texas|        TX|1751|           13.27775|
|Texas|        TX|1752|  5.451499999999999|
|Texas|        TX|1753| 11.913666666666664|
|Texas|        TX|1754| 12.221555555555556|
|Texas|        TX|1755|              9.151|
|Texas|        TX|1756|  12.26388888888889|
+-----+----------+----+-------------------+
only showing top 10 rows



- Example query: Relationship between the state that immigrants have decided to settle in and the demographic breakdown of those states

In [47]:
test_query_two = spark.sql('''
SELECT DISTINCT(i.us_arrival_state), d.state, COUNT(i.us_arrival_state) AS arrival_state_count, AVG(d.pct_male_pop) AS avg_pct_male_pop, AVG(d.pct_female_pop) AS avg_pct_female_pop, AVG(d.pct_foreign_born) AS avg_pct_foreign_born, AVG(d.pct_veterans) AS avg_pct_veterans, AVG(d.pct_native) AS avg_pct_native, AVG(d.pct_asian) AS avg_pct_asian, AVG(d.pct_black) AS avg_pct_black, AVG(d.pct_hispanic) AS avg_pct_hispanic, AVG(d.pct_white) AS avg_pct_white
FROM fact_immigration i
JOIN dim_demographics d
ON i.us_arrival_state = d.state_code
GROUP BY i.us_arrival_state, d.state
ORDER BY arrival_state_count DESC''')

test_query_two.show(10)

+----------------+-------------+-------------------+------------------+------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|us_arrival_state|        state|arrival_state_count|  avg_pct_male_pop|avg_pct_female_pop|avg_pct_foreign_born|  avg_pct_veterans|    avg_pct_native|     avg_pct_asian|     avg_pct_black|  avg_pct_hispanic|     avg_pct_white|
+----------------+-------------+-------------------+------------------+------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|              CA|   California|           63426068| 49.36359840241623| 50.63640159732174|  27.574515157502372| 4.126470420415788|1.6653726253289538|17.925322181813687| 7.450919429970588| 37.81154866182277| 62.66834268695217|
|              FL|      Florida|           29608416| 48.06068047709096| 51.93931952339614|   24.

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**fact_immigration**

Source: i94 immigration data

|Column Name          |Description|
|--------------|------|
|cicid          |Unique identifier     |
|i94yr         |4 digit year of arrival    |
|i94cit         |Country of citizenship    |
|i94res        |Country of residence    |
|i94mode          |Mode of transport into the U.S.    |
|i94bir         |Age of non-immigrant     |
|i94_visa_code      |Code for visa type     |
|us_arrival_state         |Final address of the migrants     |
|arrival_date        |Arrival date     |
|departure_date         |Departure date    |
|port_code         |Code for port of arrival |
|port_city         |City that the port of arrival is in |
|port_state         |State that the port of arrival is in |
|occupation         |Occupation of immigrant |
|birth_year         |4 digit year of birth |
|gender         |Non-immigrant sex |


**dim_temperature**

Source: World Temperatures Data

|Column Name          |Description|
|--------------|------|
|date       |Date     |
|avg_temp         |Average Temp. in Celsius    |
|city        |City name     |
|country       |Country     |
|year        |Year (extracted from the date column    |
|state       |State Name  |
|state_code       |State code  |

**dim_demographics**

Source: U.S. City Demographic Data

|Column Name          |Description|
|--------------|------|
|city        |City name     |
|state         |State mame    |
|state_code       |State code    |
|median_age         |Median age of the population    |
|avg_house_size         |Median age of the population    |
|tot_pop         |Total population     |
|pct_male_pop        |Percentage of population that is male population    |
|pct_female_pop         |Percentage of population that is female population     |
|pct_foreign_born       |Percentage of population that is foreign born    |
|pct_native       |Percentage of population that is American Indian and Alaska Native      |
|pct_asian        |Percentage of population that is Asian      |
|pct_black         |Percentage of population that is Black or African-American    |
|pct_hispanic        |Percentage of population that is Hispanic or Latino  |
|pct_white       |Percentage of population that is White  |
|pct_veterans      |Percentage of population that are veterans  |

**dim_airports**

Source: Airport Code Table

|Column Name          |Description|
|--------------|------|
|ident          |Identifier     |
|name        |Name of airport   |
|municipality         |Municipality of the airport    |
|state_code        |State of airport    |
|type        |Type of airport  |
|iso_country        |ISO code of the country that the airport resides in    |
|iso_region      |ISO code of the region that the airport resides in    |


**dim_visa**

Source: i94 immigration data

|Column Name          |Description|
|--------------|------|
|cicid          |Unique identifier from the i94 dataset which represents each immigrant   |
|i94_visa_code          |Code representing the type of visa    |
|visa_type        |Type of visa   |


#### Step 5: Complete Project Write Up


**Clearly state the rationale for the choice of tools and technologies for the project.**

Technologies used:
- Python (pandas)
- Apache Spark (PySpark)

The reasons I chose to use Spark for this project are because:
- Spark is a Big Data framework and is able to process large amounts of data fairly efficiently and as the immigration data from the i94 dataset has around 3M records, it made sense to use Spark
- Spark also includes several libraries that can be used for data analysis - in this project we used Spark SQL to create the fact and dimension tables

Pandas was chosen as it works well for data manipulation and data analysis.

I also wrote files to the Parquet file format. Apache Parquet is an open source, column-oriented data file format - it was used for this project as it allows for efficient data retrieval (which helped when the files were queried using Spark).

**Propose how often the data should be updated and why.**

The i94 immigration dataset is split into monthly files so it makes sense for the fact_immigration table to be updated monthly. The dimension tables can therefore also be updated on a monthly basis.

**Write a description of how you would approach the problem differently under the following scenarios:**

**1. The data was increased by 100x.**

If the size of the data was increased by 100x, I would use the following technologies:
- Still use Spark as it can handle large amounts of data


**2. The data populates a dashboard that must be updated on a daily basis by 7am every day.**

Use Apache Airflow to schedule and run the data pipelines at 7am every day.

**3. The database needed to be accessed by 100+ people.**

You can store the databse on an Amazon Redshift cluster. You can then gie multiuser access to the relevant people.