# Project Title
## Data Engineering Capstone Project

### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here

import time
import json
import numpy as np
import pandas as pd
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id, countDistinct
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
import pyspark.sql.types as T

In [2]:
# read configuration file with AWS credentials
config = configparser.ConfigParser()
config.read('aws.cfg')

# save as environment variables
os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
# define the paths where the raw data is
# Udacity workspace
imm_folder_loc = "../../data/18-83510-I94-Data-2016"
airport_file_loc = "./airport-codes_csv.csv"
demographics_file_loc = "./us-cities-demographics.csv"
temperature_file_loc = "../../data2/GlobalLandTemperaturesByCity.csv"

# local version
# imm_folder_loc = "C:/Users/jlg/aws/dend-capstone-imm-data/data/18-83510-I94-Data-2016"
# airport_file_loc = "./airport-codes_csv.csv"
# demographics_file_loc = "./us-cities-demographics.csv"
# temperature_file_loc = "C:/Users/jlg/aws/dend-capstone-temperature-data/GlobalLandTemperaturesByCity.csv"

# define the destination paths where the data should go, e.g. on S3
# does this have any use with using the CLI? Should I use some kind of !bash command?
dest_bucket = "s3a://udacity-dend-capstone"
dest_imm_key = "immigration/"
dest_airport_key = "airport_codes/"
dest_demographics_key = "demographics/"
dest_temperature_key = "temperature/"

## Step 1: Scope the Project and Gather Data

### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### Scope of the project
Travely, a US-based touristic tour provider, would like to analyze data of people traveling to the US. They would like to improve their offerings of day-tours and longer guided travels to meet their potential customers' needs. Additionally, they would like to know more about where and when people arrive so they can best advertise accordingly.

For this, they would like to have a data lake including the most relevant data from the data sources listed below.

For this project, I will use Spark to process the data, S3 to store it in parquet format and possibly a database service provided by AWS.

### Describe and Gather Data
I am using the datasets provided by Udacity. These include:

**US immigration data**: A dataset that includes flight passenger data collected at immigration, such as the airport, arrival and departure, birthyear, gender, airline, etc.

**Airport codes**: A dataset about airports, including their international and local codes, country, municipality, coordinates etc.

**City temperature data**: A dataset about temperature in global cities, including data from the 18th to the 21st century, such as city, country, coordinates, temperature and temperature uncertainty.

**US cities demographic**: A dataset about US cities, including the state, total population, and other factors such as average household size.

## Step 2: Explore and Assess the Data
### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

### Cleaning Steps
Document steps necessary to clean the data

#### Immigration Data

Here we work with Spark, which has a package supporting the SAS format (.sas7bdat). Spark is a bit slow, so when I ran the cells below, each one took up to 30 s.

(In this case, pandas was a lot slower, so I chose Spark, although pandas also has an option to read in SAS files.)

In [4]:
spark = SparkSession.builder \
        .config("spark.jars.packages",\
                "org.apache.hadoop:hadoop-aws:2.7.1,com.amazonaws:aws-java-sdk:1.7.4,saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .enableHiveSupport() \
        .getOrCreate()
print("SparkSession created for Immigration data")

df_imm =spark.read.format('com.github.saurfang.sas.spark')\
        .load(os.path.join(imm_folder_loc, 'i94_apr16_sub.sas7bdat'))
print("Immigration data read into a Spark DataFrame")


SparkSession created for Immigration data
Immigration data read into a Spark DataFrame


In [5]:
%time df_imm.head(5) # wall time 5-10s

CPU times: user 3.83 ms, sys: 4.23 ms, total: 8.06 ms
Wall time: 5.33 s


[Row(cicid=6.0, i94yr=2016.0, i94mon=4.0, i94cit=692.0, i94res=692.0, i94port='XXX', arrdate=20573.0, i94mode=None, i94addr=None, depdate=None, i94bir=37.0, i94visa=2.0, count=1.0, dtadfile=None, visapost=None, occup=None, entdepa='T', entdepd=None, entdepu='U', matflag=None, biryear=1979.0, dtaddto='10282016', gender=None, insnum=None, airline=None, admnum=1897628485.0, fltno=None, visatype='B2'),
 Row(cicid=7.0, i94yr=2016.0, i94mon=4.0, i94cit=254.0, i94res=276.0, i94port='ATL', arrdate=20551.0, i94mode=1.0, i94addr='AL', depdate=None, i94bir=25.0, i94visa=3.0, count=1.0, dtadfile='20130811', visapost='SEO', occup=None, entdepa='G', entdepd=None, entdepu='Y', matflag=None, biryear=1991.0, dtaddto='D/S', gender='M', insnum=None, airline=None, admnum=3736796330.0, fltno='00296', visatype='F1'),
 Row(cicid=15.0, i94yr=2016.0, i94mon=4.0, i94cit=101.0, i94res=101.0, i94port='WAS', arrdate=20545.0, i94mode=1.0, i94addr='MI', depdate=20691.0, i94bir=55.0, i94visa=2.0, count=1.0, dtadfile=

In [6]:
df_imm =spark.read.format('com.github.saurfang.sas.spark')\
        .load(os.path.join(imm_folder_loc, 'i94_jan16_sub.sas7bdat'))
print("jan loaded")
months = ["feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"]
imm_columns = df_imm.columns

# For some reason, the file for June has more columns, all starting with 'delete', probably deprecated.
# I will thus select only the columns present in all of the dataframes.
for month in months:
    df_imm_next = spark.read.format('com.github.saurfang.sas.spark')\
                        .load(os.path.join(imm_folder_loc, f'i94_{month}16_sub.sas7bdat'))
    print(f"Immigration for {month} loaded")
    df_imm = df_imm.union(df_imm_next[imm_columns])
    print(f"Immigration for {month} appended")

print("Immigration: All months loaded")

jan loaded
Immigration for feb loaded
Immigration for feb appended
Immigration for mar loaded
Immigration for mar appended
Immigration for apr loaded
Immigration for apr appended
Immigration for may loaded
Immigration for may appended
Immigration for jun loaded
Immigration for jun appended
Immigration for jul loaded
Immigration for jul appended
Immigration for aug loaded
Immigration for aug appended
Immigration for sep loaded
Immigration for sep appended
Immigration for oct loaded
Immigration for oct appended
Immigration for nov loaded
Immigration for nov appended
Immigration for dec loaded
Immigration for dec appended
Immigration: All months loaded


Let's see how many rows we have in this dataset (result: 40,790,529 rows). There are no duplicates (number of rows stays the same when dropping duplicates).

In [7]:
# %time df_imm.count() # wall time 8min 50s - result: 40,790,529 rows

In [8]:
df_imm = df_imm.dropDuplicates()
# %time df_imm.count() # wall time 13min 38s - result: 40,790,529 rows

In [9]:
# select only people who came by plane -> i94mode==1
df_imm = df_imm[df_imm['i94mode']==1]

# %time df_imm.count() # -> reduced to 39,166,088
# wall time 7min 58s

In [10]:
df_imm = df_imm.dropna(subset=['i94yr', 'i94mon', 'i94port', 'arrdate', 'i94bir']) # wall time < 1s

# %time df_imm.count() # wall time 7min 53s - result: 39,1458,783 rows

In [11]:
# select the most relevant columns (explain why)
%time df_imm = df_imm['i94yr', 'i94mon', 'i94port', 'i94addr', 'i94visa', 'arrdate', 'depdate', 'biryear', 'gender', 'visatype']
# wall time < 1s

CPU times: user 5.55 ms, sys: 256 µs, total: 5.81 ms
Wall time: 65.5 ms


In [12]:
# convert dates and add+compute duration column




In [None]:
# rename columns, e.g. i94yr -> year, i94port -> airport_code
# change this in the write task below as well

In [None]:
# write data to parquet files
# Here I chose to write the parquet files locally and then use the aws cli to upload them to the S3 bucket.
# Time comparison: 20 min writing + < 1min uploading 
# vs. writing directly to S3: not finished even after 6 hours

%time df_imm.write \
            .partitionBy('i94mon').mode('overwrite') \
            .parquet("./imm_data/")

# maybe partition also by 'i94addr' ?
# omit partition by i94yr for this dataset since it is only 2016, BUT INCLUDE THIS REASON IN THE WRITEUP

include details about cost and my choices

The I94 immigration data also had some annotations in the file I94_SAS_Labels_Descriptions.SAS. From this, I have created the respective JSON files (such as i94addr_desc.json, in the desc folder), which will generate more tables for the final data model. (This is not included as code since I had to manually fix the files at some points.) Joining the immigration data and these tables will give more information about the encoded information, such as the state/country the immigrating person comes from.

In [None]:
# sync description json files to S3
!aws s3 sync ./desc/ s3://udacity-dend-capstone/desc/ --exclude "*ipynb*" --only-show-errors
print("Immigration description data: json files uploaded to S3")

In [None]:
# upload immigration parquet files to S3
!aws s3 sync ./imm_data/ s3://udacity-dend-capstone/immigration/ --only-show-errors
    # os.path.join(dest_bucket, dest_imm_key)
print("Immigration data: parquet files uploaded to S3")

In [None]:
print("Immigration data: PROCESSING FINISHED")

#### Temperature Data

Since the raw data is in CSV format, I will use pandas for the exploration and cleaning because it is faster and provides an easy overview through its automatic table-formatting. Each step with pandas should take under 10 s.

To write the data to parquet files in S3, I will use Pyspark - unlike pandas, it does not require additional dependencies for that task.

In [None]:
%time pd_temperature = pd.read_csv(temperature_file_loc)

In [None]:
print(pd_temperature.shape)
pd_temperature.head(10)

Here we can already see there are some NaN values in the dataset. In the next step, we will determine how many values in which column are NaN.

In [None]:
pd_temperature.isnull().sum()

As we can see, only AverageTemperature and AverageTemperatureUncertainty have NaN values, and they have the same number of NaN values.
Thus, we assume they are always either both NaN or both have a non-null value (as seen above in the first few lines of data).

In the next step, we will drop the NaN values from the dataframe and verify there are no more NaN values in it.

In [None]:
pd_temperature = pd_temperature.dropna()
print(pd_temperature.isnull().sum())
pd_temperature.head(10)

Since Travely currently only operates in the US, we will select the temperature values for Country='United States', and also just include data after 2000.

In [None]:
pd_temperature = pd_temperature[pd_temperature['Country']=='United States']
pd_temperature = pd_temperature[pd_temperature['dt']>='2000-01-01']
print(pd_temperature.shape)
pd_temperature.head(10)

Before writing the table to file, I will rename the columns so they are a bit shorter (in the case of the temperature columns) and match the usual snake-case convention (e.g. AverageTemperature -> avg_temperature).

In [None]:
pd_temperature.rename(columns = {"AverageTemperature": "avg_temperature",
                                "AverageTemperatureUncertainty": "avg_temperature_uncert",
                                "City": "city",
                                "Country": "country",
                                "Latitude": "latitude",
                                "Longitude": "longitude"},
                     inplace = True)
pd_temperature.head(10)

Finally, we write the data into parquet files using Spark - after a small data quality check. The transformation of the pandas DataFrame to the Spark DataFrame, the file writing, and the S3 upload should each only take a few seconds.

To avoid high charges for S3 LIST etc. operations and to make the process faster, I will first write the files to the workspace and then copy them to the S3 bucket.

In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages",\
                "org.apache.hadoop:hadoop-aws:2.7.1,com.amazonaws:aws-java-sdk:1.7.4,saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .getOrCreate()

df_temperature = spark.createDataFrame(pd_temperature)

In [None]:
# check that dataframe is not empty
if df_temperature.head(1) != 0:
    print("Data Quality Check: data frame not empty, passed")
else:
    print("DATAFRAME EMPTY")

# check that there are multiple cities and dates
if df_temperature.groupby("city").count().head(1) != 0 :
    print("Data Quality Check: multiple cities, passed")
else:
    print("Data Quality Check FAILED: missing cities")

if df_temperature.groupby("dt").count().head(1) != 0:
    print("Data Quality Check: multiple and dates, passed")
else:
    print("Data Quality Check FAILED: missing or dates")

In [None]:
%time df_temperature.write \
                .partitionBy('city') \
                .mode('overwrite') \
                .parquet(os.path.join(".", dest_temperature_key))

In [None]:
# upload files to S3 using the AWS CLI
!aws s3 sync ./temperature/ s3://udacity-dend-capstone/temperature/ --only-show-errors
print("Temperature data: uploaded to S3")

In [None]:
print("Temperature data: PROCESSING FINISHED")

#### Airport Data

In [None]:
pd_airport = pd.read_csv(airport_file_loc, delimiter=",")
pd_airport.head(10)

In [None]:
pd_airport.shape

In [None]:
pd_airport.info()

Since in the immigration data only airports with an international airport code are given (also known as IATA code), we will remove any rows without an IATA code from the airport table. Then, we will check how many rows are left in our dataset.

In [None]:
pd_airport = pd_airport.dropna(subset=["iata_code"])
pd_airport.shape

For joining with the immigration table, columns like ident, local_code, gps_code, and continent are not as relevant, thus we will drop them.

In [None]:
pd_airport.drop(columns = ["ident", "local_code", "continent", "gps_code"], inplace=True)
pd_airport.head(5)

Having the coordinates together in a tuple is not very convenient. Two double-type columns latitude and longitude seem more beneficial for data analysis.

In [None]:
#pd_airport['coordinates'].map(eval()).head(5)
#pd_airport.head(5)
type(pd_airport.coordinates[1088])
#pd_airport['latitude'] = pd_airport['coordinates'][0]
pd_airport[['latitude', 'longitude']] = pd.DataFrame(pd_airport['coordinates'].tolist(), index=pd_airport.index)
pd_airport.drop(columns=['coordinates'], inplace=True)
pd_airport.head(5)

To make joins more intuitive, we will rename some of the columns.

In [None]:
pd_airport.rename(columns = {"type": "airport_type",
                            "name": "airport_name",
                            "iata_code": "airport_code"},
                 inplace = True)
pd_airport.head(5)

In [None]:
# data quality check: relevant columns do not contain null/NaN
if (pd_airport.isna().sum()["airport_name"] == 0) and (pd_airport.isna().sum()["airport_code"] == 0):
    print("Data Quality Check - passed: No missing airport names or airport codes")
else:
    print("Data Quality Check - FAILED: Missing airport names or airport codes")

The last step is to write this data to files and copy these to S3, using Spark.

In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages",\
                "org.apache.hadoop:hadoop-aws:2.7.1,com.amazonaws:aws-java-sdk:1.7.4,saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .getOrCreate()

schema = T.StructType([T.StructField("airport_type", T.StringType()),
                      T.StructField("airport_name", T.StringType()),
                      T.StructField("elevation_ft", T.DoubleType()),
                      T.StructField("iso_country", T.StringType()),
                      T.StructField("iso_region", T.StringType()),
                      T.StructField("municipality", T.StringType()),
                      T.StructField("airport_code", T.StringType()),
                      T.StructField("latitude", T.DoubleType()),
                      T.StructField("longitude", T.DoubleType())])

df_airport = spark.createDataFrame(pd_airport, schema)

In [None]:
%time df_airport.write \
                .partitionBy('iso_country') \
                .mode('overwrite') \
                .parquet(os.path.join(".", dest_airport_key))

In [None]:
# upload files to S3 using the AWS CLI
!aws s3 sync ./airport_codes/ s3://udacity-dend-capstone/airport_codes/ --only-show-errors
print("Airport data: uploaded to S3")

#### Demographics Data

In [None]:
pd_demographics = pd.read_csv(demographics_file_loc, delimiter=";")
pd_demographics.head(10)

In [None]:
pd_demographics[pd_demographics['City'] == "Silver Spring"].head(20)

In [None]:
pd_demographics.shape

In [None]:
print(pd_demographics.isna().sum())
pd_demographics.dropna(inplace = True)
print(pd_demographics.isna().sum())

Race is not relevant in any way for Travely, so the colums Race and Count can be omitted. The information in the other columns is the same for any "Race", so no aggregations are necessary here.

In [None]:
pd_demographics.drop(columns = ["Race", "Count"], inplace = True)
pd_demographics.drop_duplicates(inplace = True)
print(pd_demographics.shape)
pd_demographics.head(5)

In [None]:
pd_demographics.rename(columns = {"City": "city",
                                  "State": "state",
                                  "Median Age": "median_age",
                                  "Male Population": "male_population",
                                  "Female Population": "female_population",
                                  "Total Population": "population",
                                  "Number of Veterans": "veterans",
                                  "Foreign-born": "foreign_born",
                                  "Average Household Size": "avg_household_size",
                                  "State Code": "state_code"},
                      inplace = True)
pd_demographics.head(5)

In [None]:
pd_demographics.dtypes

In [None]:
# people occur only in integers -> change type to int64 for columns male_population, female_population, veterans, foreign_born
pd_demographics = pd_demographics.astype({"male_population": "int64",
                                          "female_population": "int64",
                                          "veterans": "int64",
                                          "foreign_born": "int64"})

In [None]:
pd_demographics.dtypes

Data Quality Checks

We would like the following conditions to be true:

population = male_population + female_population
population > foreign_born
population > avg_household_size
male_population > veterans

In [None]:
errors = 0

for index, row in pd_demographics.iterrows():
    if (row.population < (row.male_population + row.female_population)):
        # since some people might self-identify as neither male or female, there could be more, but not less people than males+females
        errors += 1
print(f"Data quality issues in population count: {errors}")

errors = 0
for index, row in pd_demographics.iterrows():
    if row.population < row.foreign_born:
        errors += 1
print(f"Data quality issues in foreign_born: {errors}")
errors = 0

for index, row in pd_demographics.iterrows():
    if row.population < row.avg_household_size:
        errors += 1
print(f"Data quality issues in avg_household_size: {errors}")
errors = 0

for index, row in pd_demographics.iterrows():
    if row.male_population < row.veterans:
        errors += 1
print(f"Data quality issues in veterans: {errors}")


In [None]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages",\
                "org.apache.hadoop:hadoop-aws:2.7.1,com.amazonaws:aws-java-sdk:1.7.4,saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .getOrCreate()

In [None]:
df_demographics = spark.createDataFrame(pd_demographics)

%time df_demographics.write \
                .partitionBy('state_code') \
                .mode('overwrite') \
                .parquet(os.path.join(".", dest_demographics_key))

In [None]:
# upload files to S3 using the AWS CLI
!aws s3 sync ./demographics/ s3://udacity-dend-capstone/demographics/ --only-show-errors
print("Demographics data: uploaded to S3")

### Cleaning the Data

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.