# Udacity Data Engineering Capstone Project: Version 1.0

### Tools and Libraries: PostgreSQL, PySpark, Psycopg2, Pandas, SQLAlchemy.

This file is my first run at the project. I've chosen to do this on a local installation of PostgreSQL. Primarily, this is an exercise to get PostgreSQL up and running for later use. It'll be a great option when I don't want to incur the costs associated with cloud services on AWS / Azure / Watson. Go up a level in this repository to see the solutions I developed using different technologies.

## Set-up Process

Prior to running this on your machine, you need to install PostgreSQL and Spark. Spark requires Java so if you don't already have that, you'll need to install it too. These are the steps that worked for me. My adventures in StackOverflow have told me that no two Windows 10 PCs are the same so I expect that your process will look slightly different in the end.

1. Install PostgreSQL locally. You can get it here:  
    https://www.enterprisedb.com/downloads/postgres-postgresql-downloads
2. Superuser = 'postgres'. Password = 'postgres'. 
3. Create local postgres database.
4. Add details to a configuration file, dl.cfg.
5. If running on Windows, you may need to install or update Java. Instructions:               
    https://towardsdatascience.com/installing-apache-pyspark-on-windows-10-f5f0c506bea1
6. I had to get the JDK. Make sure you get JDK version 8, not later as Spark is not compatible with later versions yet. Instructions in the first link, download files in the second:  
    https://docs.oracle.com/javase/7/docs/webnotes/install/windows/jdk-installation-windows.html  
    https://www.oracle.com/java/technologies/javase-jdk14-downloads.html
7. An important step in the JDK install is to add JAVA_HOME as an environment variable. You can't have a space in the path name for this. I got around it by referring to '\Program Files\' as '\Progra~1\'. 
    + While you're here, you can set the SPARK_HOME environment variable too.
8. It was also necessary to download 7Zip (or similar) to unpack the downloaded Apache Spark gdz file:  
    https://www.7-zip.org/download.html
9. More useful notes here:  
    https://medium.com/big-data-engineering/how-to-install-apache-spark-2-x-in-your-pc-e2047246ffc3
10. Get the PostgreSQL JDBC driver. This enables writing from a Spark dataframe to a PostgreSQL database:  
    https://jdbc.postgresql.org/download.html
11. Notes on the JDBC driver:  
    https://jdbc.postgresql.org/documentation/81/setup.html

**NB. In the function that creates a SparkSession below, I've included two '.config' lines. The second of these is necessary to handle the JDBC driver.**

12. Import the SAS data immigration files. I copied them from the Udacity workspace to my local drive by running this line in a Terminal:

> !zip -r data.zip ../../data/18-83510-I94-Data-2016  

    This created a zip file containing all files within the sas_data folder on the Udacity workspace. I downloaded and unzipped this file to work with the data. The sas_data folder was created locally in the same place as this notebook file. I also downloaded I94_SAS_Labels_Descriptions.SAS to the same location.

In [1]:
## Run these installs if you haven't already installed the packages on your machine.

#!pip install pyspark
#pip install psycopg2
#conda install pyspark
#!pip install findspark # run once all of the spark installation steps have been followed.

import psycopg2 # PostgreSQL database adapter for Python
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT # <-- ADD THIS LINE
import configparser # to work with the configuration file
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import (StructType, StructField, StringType, DoubleType, IntegerType, TimestampType,FloatType)
import pandas as pd
import findspark
import pyspark
from sqlalchemy import create_engine # allows writing to PostgreSQL from Spark

config = configparser.ConfigParser()
config.read('dl.cfg') # edit this file to include your own values.

#the variables below are set from those you entered into 'dl.cfg'
POSTGRES_USER=config['POSTGRES']['POSTGRES_USER']
POSTGRES_PASSWORD=config['POSTGRES']['POSTGRES_PASSWORD']
POSTGRES_DATABASE=config['POSTGRES']['POSTGRES_DATABASE']
SAS_DATA_LOCATION=config['OTHER']['SAS_DATA_LOCATION']

In [None]:
# Optional step to clear tables from PostgreSQL before you start. 
exec_psql('drop table if exists temperatures;')
exec_psql('drop table if exists demographics;')
exec_psql('drop table if exists visa_group_sas;')
exec_psql('drop table if exists fact_immigration;')
exec_psql('drop table if exists fact_temperatures;')
exec_psql('drop table if exists airports_iata;')
exec_psql('drop table if exists dim_date;')
exec_psql('drop table if exists immigration;')
exec_psql('drop table if exists country_sas;')
exec_psql('drop table if exists airport_city_state;')
exec_psql('drop table if exists dim_arrival_mode;')
exec_psql('drop table if exists airport_sas;')
exec_psql('drop table if exists dim_country;')
exec_psql('drop table if exists dim_airport;')
exec_psql('drop table if exists dim_visa_group;')
exec_psql('drop table if exists arrival_mode_sas;')
exec_psql('drop table if exists dim_us_state;')
exec_psql('drop table if exists us_state_sas;')
exec_psql('drop table if exists fact_immigration;')


### Create a function to run SQL queries on a local installation of PostgreSQL.

This function handles select queries differently to DDL/DML statements, returning a recordset as a pandas dataframe for the former.

In [2]:
def exec_psql(input_sql = "select '*No SQL statement provided.';",print_cols = "No"): 
    """
    Connect to postgres database, execute query, option: show table definition, 
    show requested rows (if 'select'), close connection.
    """

    try:
        con = psycopg2.connect(host='localhost',database=POSTGRES_DATABASE, user=POSTGRES_USER,password=POSTGRES_PASSWORD)
        con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
        
        #handle action queries       
        if input_sql.strip()[0:6].lower() != 'select':
            cur = con.cursor()
            cur.execute(input_sql)
            cur.close
            con.close
            print(f'Executed non-select query: {input_sql}')
        else:
            df = pd.read_sql(input_sql, con)
            # this argument allows you to view the schema of a table
            if print_cols != "No":
                df_c = pd.read_sql(f"select column_name, data_type, character_maximum_length \
                from INFORMATION_SCHEMA.COLUMNS where table_name ='{print_cols}';", con)
                print(df_c)
            pd.options.display.max_columns = None # allow the user to see all of the data requested on screen
            pd.options.display.max_rows = None # allow the user to see all of the data requested on screen
            df = pd.read_sql(input_sql, con)
            con.close
            return df
            pd.reset_option('^display.', silent=True) # reset the options to their defaults. here, this affects max_columns and max_rows
            
    except Exception as e:
        print('Error: ',e)

# Move data from source to PostgreSQL staging tables

The next two cells define two functions. The first creates a Spark session. The second utilises a Spark session to build a Spark dataframe from an input file and then writes the data to a specified table in a specified PostgreSQL database.

In [3]:
def create_spark_session():
    """Create a spark session in which to work on the data."""
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .config("spark.driver.extraClassPath", "C:/Progra~1/Java/jdk1.8.0_251/postgresql-42.2.14.jar")\
        .getOrCreate()
    return spark

In [4]:

def spark_to_postgresql(read_format,fpath,tname,delimiter=','):
    """
    Create a Spark dataframe from an input file. 
    
    Args:
    
    read_format: E.g. csv.
    fpath: Full path for your input file, e.g. 'c:\your_file.csv'.
    tname: The name of the table to write to in PostgreSQL.
    delimiter: E.g. ','
    """
    spark = create_spark_session()
    df =spark.read.format(read_format) \
                  .option("header","true") \
                  .option("delimiter",delimiter) \
                  .load(fpath)
    exec_psql(f"drop table if exists {tname}")
    df.write.jdbc(f"jdbc:postgresql:{POSTGRES_DATABASE}", tname,
    properties={"user": POSTGRES_USER, "password": POSTGRES_PASSWORD})
    print(tname,f': copied to postgres {POSTGRES_DATABASE}')
    print(' ')
    #display ten rows from the postgres table for the user to view
    exec_psql(f"select * from {tname} limit 10",tname)


Now we use the above function to create PySpark dataframes of the SAS immigration data and copy this to the local PostgreSQL database.    
    
    Spark is generally used for handling large datasets. Moving a large dataset to a local PostgreSQL database would be slow and certainly not best practice. However, the dataset here is not large while I am demonstrating the use of Spark to read and write a dataset.

Similarly, dataframes are also created of three more provided source files, each of which was provided on the Udacity workspace as a CSV and which I copied locally to work with:
   + Temperature Data
   + Demographics
   + Airports

In [5]:
# build a dictionary of arguments for the four input files
parameters_dict = {'immigration': {'read_format':'com.github.saurfang.sas.spark','fpath':SAS_DATA_LOCATION + 'i94_apr16_sub.sas7bdat','delimiter':','},
       'temperatures': {'read_format':'csv','fpath':'GlobalLandTemperaturesByCity.csv','delimiter':','},
       'demographics': {'read_format':'csv','fpath':'us-cities-demographics.csv','delimiter':';'},
       'airports_iata': {'read_format':'csv','fpath':'airport-codes_csv.csv','delimiter':','}
      }

# iterate through the dictionary, writing each dataframe to PostgreSQL. we can look at the schema for each staging table too.
for k in parameters_dict.keys():
    spark_to_postgresql(parameters_dict[k]['read_format'],parameters_dict[k]['fpath'],k,parameters_dict[k]['delimiter'])

Executed non-select query: drop table if exists immigration
immigration : copied to postgres udacity_capstone
 
   column_name         data_type character_maximum_length
0        cicid  double precision                     None
1        i94yr  double precision                     None
2       i94mon  double precision                     None
3       i94cit  double precision                     None
4       i94res  double precision                     None
5      i94port              text                     None
6      arrdate  double precision                     None
7      i94mode  double precision                     None
8      i94addr              text                     None
9      depdate  double precision                     None
10      i94bir  double precision                     None
11     i94visa  double precision                     None
12       count  double precision                     None
13    dtadfile              text                     None
14    visapost    

Create pandas dataframes from the SAS look-up data in the file 'I94_SAS_Labels_Descriptions.SAS' (copied locally from Udacity workspace). These are then copied to PostgreSQL.

In [None]:
#this is the lookup data
with open('I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

def code_mapper(file, idx):
    """
    From udacity knowledge base.
    Sources lookup values from the SAS file. 
    Returns a dictionary.
    """
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

i94cit = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa = {'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}
#correct value that's not right
i94cit['582'] = 'MEXICO'

#this creates a list that contains the dictionaries for each field and the new description for it.
pairs = [(i94cit, 'country'), (i94port,'airport') #use i94cit/i94res : same lookup values
         ,(i94mode,'arrival_mode'),(i94addr,'us_state'),(i94visa,'visa_group')]

#get the lookup data into a pd dataframe and copy to PostgreSQL
for (data, col) in pairs:
    df = pd.DataFrame.from_dict(data, orient='index', columns=[col])
    df.reset_index(level=0, inplace=True)
    cols = df.columns
    cols.values[0] = cols[1] + '_cd'
    df.columns = cols
    exec_psql(f'drop table if exists {col}_sas;')
    engine = create_engine('postgresql://{}:{}@localhost:5432/{}'.format(POSTGRES_USER,POSTGRES_PASSWORD,POSTGRES_DATABASE))
    df.to_sql(col + '_sas', engine)
    print(col,f': copied to postgres {POSTGRES_DATABASE}')   


View the tables created so far:

In [None]:
exec_psql("""
    SELECT --table_schema,
   -- ',',
    table_name
    FROM information_schema.tables
    WHERE 1=1
    and table_type = 'BASE TABLE'
    AND table_schema NOT IN ('pg_catalog', 'information_schema')
  
    ;
    """)

# The Schema

![](database_diagram.png)

# Data Dictionary

![](data_dictionary.PNG)

### Notes on the schema

The aim of the schema design I've created is to maximise the potential for analysis using the provided data sources. The focus of the analysis is expected to be the fact_immigration table. Analysts are expected to be looking for insights into instances of persons travelling to the USA. Questions that may be asked include:

    + Who are the people who are travelling?
    + Where have the people travelled from?
    + Do hotter cities see more visitors?  
      - Is this only relative to the city size?
    + Do the demographics of a destination city/state make it more or less popular for visitors?
    + Is there a correlation between temperature at departure city and temperature at destination city?

There are significant limitations and problems with the data provided. I'd recommend using left joins from the fact tables at all times. Specifically, caution should be taken in these circumstances:
   + Joining from fact_demographics to dim_airport. This will work in some cases and the idea is to enhance the immigration data rather than to use the demographic data as a basis for analysis.
   + Joining from fact_temperature to dim_airport. Again, this will work in some cases and the idea is to enhance the immigration data rather than to use the demographic data as a basis for analysis. Additional caution is necessary here as you are only joining with 'city' when there are multiple cases of the same city name appearing in more than one state. 

Necessary enhancements that were beyond the scope of this project: 
   - Add state to fact_temperature. This could be done using the latitude and longitude data in the source data. Necessary to distinguish between cities with the same name in different states.
   - Thorough cleaning of the airport codes data. This is mostly used 'as is' but it is likely that some of the codes are incorrect.

# Create the schema

Now that we have all of the data in PostgreSQL, let's make some changes to the formats and apply some filtering to get the schema that we want.

In [None]:
#create fact_immigration from staging table. apply format changes to conserve disk space. do some cleaning
exec_psql("""
    drop table if exists fact_immigration;
    create table fact_immigration
    as
    select cast(cicid as int) as unique_id
    ,cast('01-jan-1960' as date) + cast(arrdate as int) as arrival_dt
    ,cast('01-jan-1960' as date) + cast(depdate as int) as departure_dt
    ,fltno as flight_no
    ,cast(i94visa as smallint) as visa_group_cd
    ,cast(i94port as varchar(3)) as airport_cd
    --handle some invalid data
    ,to_date(case when dtaddto = 'D/S' then null
            when dtaddto = '183' then null
            when right(dtaddto,1) = 'D' then null
            else dtaddto end,'MMDDYYYY') as admitted_dt
    ,airline 
    ,cast(i94addr as varchar(2)) as state_cd
    ,cast(coalesce(i94mode,9) as smallint) as arrival_mode_cd --replace NaN with 'Not reported'
    ,entdepa as arrival_flag
    ,entdepd as departure_flag
    ,i94bir as age_on_arrival
    ,biryear as birth_year
    ,cast(i94cit as int) as country_birth_cd
    ,cast(i94res as int) as country_residence_cd
    ,gender as gender
    ,occup as occupation
    ,admnum as adm_num

    from immigration
    ;
    ALTER TABLE fact_immigration ADD PRIMARY KEY (unique_id);
""")


Create the dim_date table

In [None]:
#dim_date
exec_psql("""
    drop table if exists dim_date;
    create table dim_date
    as
    select distinct 
    arrival_dt as dt
    ,cast(extract(year from arrival_dt) as int) as yr
    ,cast(extract(month from arrival_dt) as int) as mth
    ,cast(extract(day from arrival_dt) as int) as dy
    --get the dates for each date field in the fact
    from (select distinct arrival_dt from fact_immigration 
            union select distinct departure_dt from fact_immigration
            union select distinct admitted_dt from fact_immigration) a
    ;
    delete from dim_date where dt is null or yr < 2000 --some cleaning
    ;
    ALTER TABLE dim_date ADD PRIMARY KEY (dt);
""")

Create dims for the SAS look-up tables.

In [None]:
#dim_country

exec_psql("""
    drop table if exists dim_country;
    create table dim_country as 
    select a.country_cd as country_cd
    ,trim(both from a.country) as country

    from (select cast(country_cd as int) as country_cd
            ,country 
            from country_sas)a
    ;
    ALTER TABLE dim_country ADD PRIMARY KEY (country_cd);
""")

In [None]:
""" 
Create dim_airport dimension. We use the SAS data lookup in conjunction with 
the separate airport CSV data in order to get the most complete dataset for the final schema. 
The two datasets don't mesh perfectly and more cleaning could be done.

"""

exec_psql("""
    drop table if exists dim_airport
    ;
    create table dim_airport
    as
    select distinct a.airport_cd
    ,coalesce(b.name,a.airport) as name
    ,coalesce(b.municipality,left(a.airport,position(',' in a.airport)-1)) as city
    ,coalesce(right(b.iso_region,2),
        case when position(',' in a.airport) = length(a.airport)-3 --ensure this looks like a US airport with two letter state code
                then trim(both from right(a.airport,length(a.airport) - position(',' in a.airport)))
                else '-'
                end) as state_cd
    ,coalesce(b.type,'-') as type
    from airport_sas a
    left join airports_iata b on a.airport_cd = b.iata_code 
                            and b.type <> 'closed'
                            and length(b.iso_region)=5 and substr(b.iso_region,1,3) = 'US-' --ensuring we're looking at US airports
                            and right(b.iso_region,2) = right(a.airport,2) --check the state is the same. 
                            --NB. all of these checks because the airport code data in the SAS sources is not always a valid match with
                            --the international standard iata_code
    ;
    ALTER TABLE dim_airport ADD PRIMARY KEY (airport_cd)
    ;
    
""")

In [None]:
#dim_arrival_mode
exec_psql("""
    drop table if exists dim_arrival_mode;
    create table dim_arrival_mode
    as
    select distinct cast(trim(both from arrival_mode_cd) as smallint) as arrival_mode_cd
    ,arrival_mode as arrival_mode_loc
    from arrival_mode_sas
    ;
    ALTER TABLE dim_arrival_mode ADD PRIMARY KEY (arrival_mode_cd);
""")

In [None]:
#dim_visa_group
exec_psql("""
    drop table if exists dim_visa_group;
    create table dim_visa_group
    as
    select distinct cast(visa_group_cd as smallint) as visa_group_cd
    ,visa_group
    from visa_group_sas
    ;
    ALTER TABLE dim_visa_group ADD PRIMARY KEY (visa_group_cd);
""")

In [None]:
#dim_us_state
exec_psql("""
    drop table if exists dim_us_state;
    create table dim_us_state
    as
    select distinct cast(us_state_cd as varchar(2)) as us_state_cd
    ,us_state
    from us_state_sas
    ;
    ALTER TABLE dim_us_state ADD PRIMARY KEY (us_state_cd);
""")

In [None]:
#dim_temperatures. NB. stripped out coordinates as too many cases of them being wrong, e.g. where "Latitude" = '50.63N'. 
# for future enhancements, wikipedia and other sites/APIs can provide accurate data.

exec_psql("""
    drop table if exists fact_temperatures
    ;    
    create table fact_temperatures as 
    select distinct a."City" as city
    ,a."Country" as country
    ,cast(substr(a."dt",1,4) as int)  as year
    ,AVG(cast(a."AverageTemperature" as decimal(12,2))) as average_temperature
    from temperatures a
    where 1=1
    and substr(a."dt",1,4) > '1999' --only data from this century
    group by a."City",a."Country",substr(a."dt",1,4)
    ;
    alter table fact_temperatures add primary key(city,country,year)
    ;
    """)

In [None]:
#dim_demographics. type conversions, removal of state name and flattening of the structure - in the source data each "Race"  
# has one row per city. makes more sense to have a separate column for each. with larger volumes. Could have used one-hot encoding 
# to achieve this aim too.

exec_psql("""
    drop table if exists fact_demographics
    ;
    create table fact_demographics as 
    select distinct a."City" as city
    ,cast(a."State Code" as varchar(2)) as state_cd
    ,cast(a."Median Age" as decimal(4,2)) as median_age
    ,cast(a."Male Population" as int) as male_population
    ,cast(a."Female Population" as int) as female_population
    ,cast(a."Total Population" as int) as total_population
    ,cast(b."Count" as int) as hispanic_or_latino
    ,cast(c."Count" as int) as white
    ,cast(d."Count" as int) as black_or_african_american
    ,cast(e."Count" as int) as american_indian_and_alaska_native
    ,cast(f."Count" as int) as asian
    ,cast(a."Number of Veterans" as int) as number_of_veterans
    ,cast(a."Foreign-born" as int) as foreign_born
    ,cast(a."Average Household Size" as decimal(12,2)) as average_household_size
    
    from (select distinct a."City"
            ,a."Median Age"
            ,a."Male Population"
            ,a."Female Population"
            ,a."Total Population"
            ,a."Number of Veterans"
            ,a."Foreign-born"
            ,a."Average Household Size"
            ,a."State Code"
            from demographics a) a
            
    left join (select distinct a."City"
                ,a."State Code"
                ,a."Race"
                ,a."Count"
                from demographics a
                where a."Race" = 'Hispanic or Latino') b on a."City" = b."City" and a."State Code" = b."State Code" 

    left join (select distinct a."City"
                ,a."State Code"
                ,a."Race"
                ,a."Count"
                from demographics a
                where a."Race" = 'White') c on a."City" = c."City" and a."State Code" = c."State Code" 

    left join (select distinct a."City"
                ,a."State Code"
                ,a."Race"
                ,a."Count"
                from demographics a
                where a."Race" = 'Black or African-American') d on a."City" = d."City" and a."State Code" = d."State Code" 
    
    left join (select distinct a."City"
                ,a."State Code"
                ,a."Race"
                ,a."Count"
                from demographics a
                where a."Race" = 'American Indian and Alaska Native') e on a."City" = e."City" and a."State Code" = e."State Code" 
                
    left join (select distinct a."City"
                ,a."State Code"
                ,a."Race"
                ,a."Count"
                from demographics a
                where a."Race" = 'Asian') f on a."City" = f."City" and a."State Code" = f."State Code" 
    ;
    alter table fact_demographics add primary key(city,state_cd)
    ;
    """)

In [None]:
# exec_psql('drop table "temperatures";')
# exec_psql('drop table "demographics";')
# exec_psql('drop table "airport_codes";')