# Project Title
### Udacity Data Engineering Capstone Project

#### Project Summary
This project with be used to create a data warehouse to serve as a single source of truth from multiple datasources.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import pyspark

import os
import configparser
import logging
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.functions import udf, col, lit, year, month, upper, to_date
from pyspark.sql.functions import monotonically_increasing_id

### Step 1: Scope the Project and Gather Data

#### Scope 
This project will integrate I94 immigration data, US demographic data, and world temperature data to create a data warehouse with Fact and Dimension tables for analysis and business intelligence. <br>

Spark and Python will be used to create the ETL pipeline with AWS S3 serving as the storage for our data and tables.

All data was provided by Udacity. Links have been included for original sources. <br>
    1. [I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html) <br>
    2. [World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)<br>
    3. [U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)

#### Describe and Gather Data 
| Data Set | Format | Description |
| ---      | ---    | ---         |
|[I94 Immigration Data](https://travel.trade.gov/research/reports/i94/historical/2016.html)| SAS | Data contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries).|
|[World Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)| CSV | This dataset is from Kaggle and contains monthly average temperature data at different country in the world wide.|
|[U.S. City Demographic Data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)| CSV | This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000.|

In [2]:
# Read in the data here
df_immi = pd.read_csv('immigration_data_sample.csv')

In [3]:
df_immi.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


### Step 2: Explore and Assess the Data

#### Explore Immigration Data

In [4]:
df_immi.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

In [5]:
immigration_fact = df_immi[['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr','arrdate', 'depdate', 'i94mode', 'i94visa']]
immigration_fact.columns = ['cic_id', 'year', 'month', 'city_code',  'state_code','arrival_date', 'departure_date', 'mode', 'visa']
immigration_fact.head()

Unnamed: 0,cic_id,year,month,city_code,state_code,arrival_date,departure_date,mode,visa
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0


In [6]:
# Add Country column
immigration_fact['country'] = 'United States'
immigration_fact.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Unnamed: 0,cic_id,year,month,city_code,state_code,arrival_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,20566.0,20573.0,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,20567.0,20568.0,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,20551.0,20571.0,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,20572.0,20581.0,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,20550.0,20553.0,3.0,2.0,United States


In [7]:
# create personal info dimension table
personal_dim = df_immi[['cicid', 'i94cit', 'i94res', 'biryear', 'gender']]
personal_dim.columns = ['cic_id', 'citizen_country', 'residence_country', 'birth_year', 'gender']
personal_dim.head(5)

Unnamed: 0,cic_id,citizen_country,residence_country,birth_year,gender
0,4084316.0,209.0,209.0,1955.0,F
1,4422636.0,582.0,582.0,1990.0,M
2,1195600.0,148.0,112.0,1940.0,M
3,5291768.0,297.0,297.0,1991.0,M
4,985523.0,111.0,111.0,1997.0,F


In [8]:
# create airline dimension table
airline_dim = df_immi[['cicid', 'airline', 'admnum', 'fltno', 'visatype']]
airline_dim.columns = ['cic_id', 'airline', 'adm_num', 'fight_num', 'visa_type']
airline_dim.head()

Unnamed: 0,cic_id,airline,adm_num,fight_num,visa_type
0,4084316.0,JL,56582670000.0,00782,WT
1,4422636.0,*GA,94362000000.0,XBLNG,B2
2,1195600.0,LH,55780470000.0,00464,WT
3,5291768.0,QR,94789700000.0,00739,B2
4,985523.0,,42322570000.0,LAND,WT


#### Explore Global Temperature Data

In [9]:
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df_temp = pd.read_csv(fname)
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [10]:
# no data for year 2016 to match with our immigration data...
df_temp[df_temp['dt'] >= '2016-01-01']

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude


#### Explore Demography Data

In [11]:
df_demo = pd.read_csv('us-cities-demographics.csv', delimiter=';')
df_demo.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [12]:
city_stats_dim = df_demo[['City', 'State', 'State Code', 'Median Age', 'Total Population', 'Average Household Size', 'Race']]
city_stats_dim.columns = ['city', 'state', 'state_code', 'median_age', 'total_population', 'avg_household_size', 'race']
city_stats_dim['city'] = city_stats_dim['city'].str.upper()
city_stats_dim['state'] = city_stats_dim['state'].str.upper()
city_stats_dim.head()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  This is separate from the ipykernel package so we can avoid doing imports until
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  after removing the cwd from sys.path.


Unnamed: 0,city,state,state_code,median_age,total_population,avg_household_size,race
0,SILVER SPRING,MARYLAND,MD,33.8,82463,2.6,Hispanic or Latino
1,QUINCY,MASSACHUSETTS,MA,41.0,93629,2.39,White
2,HOOVER,ALABAMA,AL,38.5,84839,2.58,Asian
3,RANCHO CUCAMONGA,CALIFORNIA,CA,34.5,175232,3.18,Black or African-American
4,NEWARK,NEW JERSEY,NJ,34.6,281913,2.73,White


#### Spark Data

In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
df_spark.head()

Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2')

In [14]:
#write to parquet
#df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("sas_data")

#### Data Cleaning Step
1. Format Dates
2. Read through SAS labels to get country, state, and city codes. Create dimension lookup table for these fields.

In [15]:
# Format Dates
def dateFormat(date):
    return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')

In [16]:
immigration_fact['arrival_date'] = dateFormat(immigration_fact['arrival_date'])
immigration_fact['departure_date'] = dateFormat(immigration_fact['departure_date'])
immigration_fact.head(5)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Unnamed: 0,cic_id,year,month,city_code,state_code,arrival_date,departure_date,mode,visa,country
0,4084316.0,2016.0,4.0,HHW,HI,2016-04-22,2016-04-29,1.0,2.0,United States
1,4422636.0,2016.0,4.0,MCA,TX,2016-04-23,2016-04-24,1.0,2.0,United States
2,1195600.0,2016.0,4.0,OGG,FL,2016-04-07,2016-04-27,1.0,2.0,United States
3,5291768.0,2016.0,4.0,LOS,CA,2016-04-28,2016-05-07,1.0,2.0,United States
4,985523.0,2016.0,4.0,CHM,NY,2016-04-06,2016-04-09,3.0,2.0,United States


In [17]:
with open("I94_SAS_Labels_Descriptions.SAS") as f:
    contents = f.readlines()

In [18]:
country_code = {}
for countries in contents[10:298]:
    pair = countries.split('=')
    code, country = pair[0].strip(), pair[1].strip().strip("'")
    country_code[code] = country

In [19]:
df_country_code = pd.DataFrame(list(country_code.items()), columns=['code', 'country'])
df_country_code.head(5)

Unnamed: 0,code,country
0,236,AFGHANISTAN
1,101,ALBANIA
2,316,ALGERIA
3,102,ANDORRA
4,324,ANGOLA


In [20]:
city_code = {}
for cities in contents[303:962]:
    pair = cities.split('=')
    code, city = pair[0].strip("\t").strip().strip("'"), pair[1].strip('\t').strip().strip("''")
    city_code[code] = city

In [21]:
df_city_code = pd.DataFrame(list(city_code.items()), columns=['code', 'city'])
df_city_code.head(5)

Unnamed: 0,code,city
0,ANC,"ANCHORAGE, AK"
1,BAR,"BAKER AAF - BAKER ISLAND, AK"
2,DAC,"DALTONS CACHE, AK"
3,PIZ,"DEW STATION PT LAY DEW, AK"
4,DTH,"DUTCH HARBOR, AK"


In [22]:
state_code = {}
for states in contents[982:1036]:
    pair = states.split('=')
    code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
    state_code[code] = state

In [23]:
df_state_code = pd.DataFrame(list(state_code.items()), columns=['code', 'state'])
df_state_code.head(5)

Unnamed: 0,code,state
0,AK,ALASKA
1,AZ,ARIZONA
2,AR,ARKANSAS
3,CA,CALIFORNIA
4,CO,COLORADO


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I have mapped out the model to be in a star schema. Since this data warehouse will be used as OLAP and for BI, I felt this was the correct option to use. <br>

Please view the data_model.png file to see the schema.

#### 3.2 Mapping Out Data Pipelines
1. Assume all data sets are stored in S3 buckets as below
	* `[Source_S3_Bucket]/immigration/18-83510-I94-Data-2016/*.sas7bdat`
	* `[Source_S3_Bucket]/I94_SAS_Labels_Descriptions.SAS`
	* `[Source_S3_Bucket]/temperature/GlobalLandTemperaturesByCity.csv`
	* `[Source_S3_Bucket]/demography/us-cities-demographics.csv`
2. Follow by Step 2 – Cleaning step to clean up data sets
3. Transform immigration data to 1 fact table (partitioned by state) and 2 dimension tables.
4. Parsing label description file to get lookup tables for country, state, and city codes.
5. Transform temperature data to dimension table.
6. Split demography data to 2 dimension tables.
7. Store these tables back to target S3 bucket.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [24]:
# setup logging 
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# AWS configuration
config = configparser.ConfigParser()
config.read('config.cfg', encoding='utf-8-sig')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']
SOURCE_S3_BUCKET = config['S3']['SOURCE_S3_BUCKET']
DEST_S3_BUCKET = config['S3']['DEST_S3_BUCKET']

In [25]:
# data processing functions
def create_spark_session():  
    spark = SparkSession.builder\
        .config("spark.jars.repositories", "https://repos.spark-packages.org/")\
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11, org.apache.hadoop:hadoop-aws:2.7.0")\
        .enableHiveSupport().getOrCreate()
    return spark

In [26]:
def dateFormat(date):
    if date is not None:
        return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')
    dateFormat_udf = udf(dateFormat, DateType())

In [27]:
def rename_columns(table, new_columns):
    for original, new in zip(table.columns, new_columns):
        table = table.withColumnRenamed(original, new)
    return table

In [28]:
def process_immigration_data(spark, input_data, output_data):
    """Process immigration data to get immigration_fact, 
    personal_dim and airline_dim tables
        Arguments:
            spark {object}: SparkSession object
            input_data {object}: Source S3 endpoint
            output_data {object}: Target S3 endpoint
        Returns:
            None
    """

    logging.info("Start processing immigration")
    
    # read immigration data file
    immi_data = os.path.join(input_data + 'immigration/18-83510-I94-Data-2016/*.sas7bdat')
    df = spark.read.format('com.github.saurfang.sas.spark').load(immi_data)

    logging.info("Start processing immigration_fact")
    # extract columns to create immigration_fact table
    immigration_fact = df.select('cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr',\
                                 'arrdate', 'depdate', 'i94mode', 'i94visa').distinct()\
                         .withColumn("immigration_id", monotonically_increasing_id())
    
    # data wrangling to match data model
    new_columns = ['cic_id', 'year', 'month', 'city_code', 'state_code',\
                   'arrive_date', 'departure_date', 'mode', 'visa']
    immigration_fact = rename_columns(immigration_fact, new_columns)

    immigration_fact = immigration_fact.withColumn('country', lit('United States'))
    immigration_fact = immigration_fact.withColumn('arrive_date', \
                                        dateFormat_udf(col('arrive_date')))
    immigration_fact = immigration_fact.withColumn('departure_date', \
                                        dateFormat_udf(col('departure_date')))

    # write fact_immigration table to parquet files partitioned by state
    immigration_fact.write.mode("overwrite").partitionBy('state_code')\
                    .parquet(path=output_data + 'immigration_fact')

    logging.info("Start processing personal_dim")
    # extract columns to create personal_dim table
    personal_dim = df.select('cicid', 'i94cit', 'i94res',\
                                  'biryear', 'gender').distinct()\
                          .withColumn("personal_id", monotonically_increasing_id())
    
    # data wrangling to match data model
    new_columns = ['cic_id', 'citizen_country', 'residence_country',\
                   'birth_year', 'gender']
    personal_dim = rename_columns(personal_dim, new_columns)

    # write dim_immi_personal table to parquet files
    personal_dim.write.mode("overwrite")\
                     .parquet(path=output_data + 'personal_dim')

    logging.info("Start processing airline_dim")
    # extract columns to create airline_dim table
    airline_dim = df.select('cicid', 'airline', 'admnum', 'fltno', 'visatype').distinct()\
                         .withColumn("airline_id", monotonically_increasing_id())
    
    # data wrangling to match data model
    new_columns = ['cic_id', 'airline', 'admn_num', 'flight_num', 'visa_type']
    airline_dim = rename_columns(airline_dim, new_columns)

    # write dim_immi_airline table to parquet files
    airline_dim.write.mode("overwrite")\
                    .parquet(path=output_data + 'airline_dim')

In [29]:
def process_label_descriptions(spark, input_data, output_data):
    """ Parsing label desctiption file to get codes of country, city, state
        Arguments:
            spark {object}: SparkSession object
            input_data {object}: Source S3 endpoint
            output_data {object}: Target S3 endpoint
        Returns:
            None
    """

    logging.info("Start processing label descriptions")
    label_file = os.path.join(input_data + "I94_SAS_Labels_Descriptions.SAS")
    with open(label_file) as f:
        contents = f.readlines()

    country_code = {}
    for countries in contents[10:298]:
        pair = countries.split('=')
        code, country = pair[0].strip(), pair[1].strip().strip("'")
        country_code[code] = country
    spark.createDataFrame(country_code.items(), ['code', 'country'])\
         .write.mode("overwrite")\
         .parquet(path=output_data + 'country_code')

    city_code = {}
    for cities in contents[303:962]:
        pair = cities.split('=')
        code, city = pair[0].strip("\t").strip().strip("'"),\
                     pair[1].strip('\t').strip().strip("''")
        city_code[code] = city
    spark.createDataFrame(city_code.items(), ['code', 'city'])\
         .write.mode("overwrite")\
         .parquet(path=output_data + 'city_code')

    state_code = {}
    for states in contents[982:1036]:
        pair = states.split('=')
        code, state = pair[0].strip('\t').strip("'"), pair[1].strip().strip("'")
        state_code[code] = state
    spark.createDataFrame(state_code.items(), ['code', 'state'])\
         .write.mode("overwrite")\
         .parquet(path=output_data + 'state_code')

In [30]:
def process_temperature_data(spark, input_data, output_data):
    """ Process temperature data to get temperature_dim table
        Arguments:
            spark {object}: SparkSession object
            input_data {object}: Source S3 endpoint
            output_data {object}: Target S3 endpoint
        Returns:
            None
    """

    logging.info("Start processing temperature_dim")
    # read temperature data file
    temp_data = os.path.join(input_data + 'temperature/GlobalLandTemperaturesByCity.csv')
    df = spark.read.csv(temp_data, header=True)

    df = df.where(df['Country'] == 'United States')
    temperature_dim = df.select(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty',\
                         'City', 'Country']).distinct()

    new_columns = ['dt', 'avg_temp', 'avg_temp_uncertnty', 'city', 'country']
    temperature_dim = rename_columns(temperature_dim, new_columns)

    temperature_dim = temperature_dim.withColumn('dt', to_date(col('dt')))
    temperature_dim = temperature_dim.withColumn('year', year(temperature_dim['dt']))
    temperature_dim = temperature_dim.withColumn('month', month(temperature_dim['dt']))
 
    # write dim_temperature table to parquet files
    dim_temperature.write.mode("overwrite")\
                   .parquet(path=output_data + 'dim_temperature')

In [31]:
def process_demography_data(spark, input_data, output_data):
    """ Process demograpy data to get city_stats_dim table
        Arguments:
            spark {object}: SparkSession object
            input_data {object}: Source S3 endpoint
            output_data {object}: Target S3 endpoint
        Returns:
            None
    """

    logging.info("Start processing city_stats_dim")
    # read demography data file
    demo_data = os.path.join(input_data + 'demography/us-cities-demographics.csv')
    df = spark.read.format('csv').options(header=True, delimiter=';').load(demo_data)

    city_stats_dim = df.select(['City', 'State', 'Median Age', 'Total Population', 'Average Household Size', 'Race'])\
                             .distinct()\
                             .withColumn("stat_id", monotonically_increasing_id())
    
    new_columns = ['city', 'state', 'median_age', 'total_population', 'avg_household_size', 'race']
    city_stats_dim = rename_columns(city_stats_dim, new_columns)
    city_stats_dim = city_stats_dim.withColumn('city', upper(col('city')))
    city_stats_dim = city_stats_dim.withColumn('state', upper(col('state')))

    # write dim_demog_statistics table to parquet files
    city_stats_dim.write.mode("overwrite")\
                        .parquet(path=output_data + 'city_stats_dim')

In [32]:
def main():
    spark = create_spark_session()
    input_data = SOURCE_S3_BUCKET
    output_data = DEST_S3_BUCKET
    
    process_immigration_data(spark, input_data, output_data)    
    process_label_descriptions(spark, input_data, output_data)
    process_temperature_data(spark, input_data, output_data)
    process_demography_data(spark, input_data, output_data)
    logging.info("Data processing completed")


if __name__ == "__main__":
    main()

INFO:root:Start processing immigration


Py4JJavaError: An error occurred while calling o63.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2654)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:114)
	at com.github.saurfang.sas.spark.SasRelation.<init>(SasRelation.scala:50)
	at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:42)
	at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:50)
	at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:39)
	at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
	... 28 more


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

#### Ensure Data Model matches image above

In [None]:
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        print("Table: " + path.split('/')[-1])
        schema = df.printSchema()

#### Ensure Data was loaded into tables

In [None]:
for file_dir in s3_bucket.iterdir():
    if file_dir.is_dir():
        path = str(file_dir)
        df = spark.read.parquet(path)
        record_num = df.count()
        if record_num <= 0:
            raise ValueError("This table is empty!")
        else:
            print("Table: " + path.split('/')[-1] + f" has {record_num} total records.")

#### 4.3 Data dictionary 
Please view data_dictionary.png.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Tools and Technologies
S3 was used as a storage place for loading and storage of the data and tables. Pandas was used for the data exploration and analysis. Pyspark was used for large dataset processing and transformation for the data warehouse tables. 

#### Data Update Frequency
Data should be updated and refreshed on a monthly basis. The month column exists in several tables and the raw data is broken down by month (including airline_dim for dates of flights)

#### Other Scenarios
1. If the data was increased by 100x <br>
    Using AWS EMR or Redshift might be a better approach since a standalone server could not handle that much data at one time. <br>
2. If the data populates a dashboard that must be updated on a daily basis by 7am every day. <br>
    Perhaps creating separate tables that are updated and refreshed daily with the latest data. This could be accomplished using Apache Airflow to create a DAG that runs on a daily interval and can be monitored to ensure success. <br>
3. If the database needed to be accessed by 100+ people. <br>
    Amazon Redshift can handle several hundred connections at once. While this may be more expensive, it would be the most obvious route to try first to provide that level of access to that many people.