# Immigration and climate patterns in major US cites
### Data Engineering Capstone Project

## Project Summary
In this project we combine demographic, immigration and climate data for the major cities of the United States. The data are first transformed with Spark and then loaded via an S3 bucket to an Amazon Redshift cluster that acts as a warehouse for analysts. The data will allow detailed analsys of residency and citizenship of visitors to various major cities in the United States, the most important airlines in terms of passenger numbers and the relationship between visitor numbers, local climate and the demographics of the US cities.

In [None]:
# =============================================================================
# Install libraries
# =============================================================================
!pip uninstall -y numpy
!pip install numpy==1.16.1
!pip install fiona
!pip install beautifulsoup4
!pip install shapely
!pip install pyspark
!pip install pyarrow
!pip install ipython-sql

In [None]:
# =============================================================================
# Load libraries
# =============================================================================
import os
import re
import pandas as pd

# Pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

from tempfile import NamedTemporaryFile
from zipfile import ZipFile
from shutil import rmtree, unpack_archive

## Scope of the project and data sources

### Scope of project 

The project is designed to provide a mixed data set that will allow a team of data analysts/scientists to explore the relationship between climate, demographics and non-immigrant visitor patterns to various major US cities, including the origin of the visitors and the main airlines used for each city.

### Description of the data sources

The core data that we use comes from the three official sources. The first and largest one comes from an US government website giving a breakdown of international visitors to an individual visitor level at each port of entry with many interesting meta data relating to the visitor such as gender, type of visa and country of origin.  
The second source is monthly temperature data for major cities around the world starting about 1750. This dataset is provided by Kaggle and originates from the Lawrence Berkeley National Laboratory.  
The final and smallest dataset is hosted on opendatasoft.com and contains demographic information of US cities and census-designated places with a population greater or equal to 65,000. This data orignated from the US Census Bureau's 2015 American Community Survey.

For the project we will mainly use these three data sets, augmented with some further data from public sources to help join the main data tables where this is possible. 

### Taking a look at the data

At this stage we will not yet load the whole data where possible but just enough to do some exploratory data analysis.

In [None]:
# Start Spark Session
# =============================================================================
spark = SparkSession \
    .builder \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .config("spark.sql.execution.arrow.enabled", "false") \
    .enableHiveSupport().getOrCreate()

In [None]:
# =============================================================================
# Load data
# =============================================================================
demographics = spark \
    .read \
    .csv('us-cities-demographics.csv', sep=';', header='true')

immigration_sample = spark \
    .read \
    .csv('immigration_data_sample.csv', sep=',', header='true')

climate = spark \
    .read \
    .csv('../../data2/GlobalLandTemperaturesByCity.csv', header='true')

#### Demographics

In [None]:
demographics.show(5)

#### Immigration

In [None]:
immigration_sample.show(5)

#### Climate

In [None]:
climate.show(5)

### Step 2: Exploration and Assement of the data

Before going on we have a look at the data to find out if there are data quality issues 
and how easy or difficult it will be to connect the data through their columns and if 
we need to engineer any extra features.

We shall start with the three main tables for immigration, climate and demographics:

In [None]:
# Let us have a closer look at the demographics table first.
demographics.columns

In [None]:
demographics.describe().show(vertical=True)

In [None]:
# Let's count the number of records.
demographics.count()

In [None]:
# And now the number of cities represented in the dataset
demographics.select(F.countDistinct('City', 'State')).show()

In [None]:
# Let's see some of the cities represented
demographics.select('City').distinct().show(20)

In [None]:
demographics.select('Race').distinct().show(truncate=False)

So there are four times as many records as there are cities. We also noticed earlier that the dataframe has a column `Total Population` and columns `Race` and `Count`. So we could unstack the table by pivoting the `Race` and `Count` columns so that each race description forms its own column with the number of people in each racial group as the value. We should then end up with a dataframe that contains
596 rows in 15 columns.

I would also note that the dataset has a `State`, `State Code` and `City` column that might be good carry out SQL-joins in Redshift.

In terms of cleaning, the data looks already good and probably does not need any further cleaning steps. Either the `State` or `State Code` column could possibly be removed as either column is sufficient for joins and a separate lookup table for states and state codes is already available and there are no missing values there. There are up to 16 missing values in the other columns. However, out of 2891 columns overall the percentage is very small and these missing values are not critical. 

In [None]:
# Let us move on to immigration and look first at the columns
immigration_sample.columns

In [None]:
immigration_sample.show(5)

In [None]:
# And now some summary statistics
immigration_sample.describe().show(vertical=True)

# Clean up sample data. We won't need this anymore
del immigration_sample

From the output above and the immigration meta-data file we can see that not all columns are useful. I would aggregate the data on port of entry and year and month of arrival. I would aggregate gender, country of origin, country of residency, length of stay (calculated from arrival and departure dates), age, visa type, and the airline used and drop the rest of the columns as many are mostly `null` anyway.
Looking at the description data it is obvious that many columns need to be combined with the meta-data to be decoded . We will use the ports of entry, and country of origin information from the meta-data file for that. For airlines we will have to find a look-up table to convert the airline codes to names.

We will also have to find a way of effectively storing the data on country of origin and country of residency as we won't be able to store them as separate columns in the dataframe (there would be more than 200 columns otherwise!).

Since both the demographics and the immigration data are best organised by location (city and state code or state) it makes sense to consider that as their potential joining columns.

In [None]:
# Let's move on to the climate data
climate.columns

In [None]:
climate.show(5)

In [None]:
climate.describe().show()

The climate data is again organised by location and date. The timestamps seem to be monthly, starting from 1743 and ending in 2013. So there seems no overlap with the immigration data which is encompassing the whole of 2016. I would suggest to keep the last 50 years from 1963 to 2013 as this would be sufficient for an analysis of average temperatures and trends.  

The locations include places inside and outside of the United States and I would drop rows with the latter. The `city` column does not include information on the US state or territory which could lead to mix-ups for city names that exist in more than one US state. However, we have information on latitude and longitude that we can use to engineer this feature.  

There are some missing values and we will need to filter these out.  

Again the climate data contains location data and monthly time points and that would be the best columns to carry out joins with the demographics and immigration data set.

### Creating the meta-data

To make effective use of the data above we have to find some extra information. Some of this is readily stored in the meta-data file for the immigration data set. However, we need some external lookup table to translate the airline codes to airline names and we need a shapefile to convert the latitude-longitude pairs in the climate data to US states and territories. So before we go on we gather these.

In [None]:
# =============================================================================
# Extract immigration meta data
# =============================================================================

! mkdir meta_data

# Define functions 
def clean_text(text):
    cleaned_text = re.sub("^[ ]*'|[ ]*'[ ]*$", "", text)
    if re.sub("[ ]*", "", cleaned_text).isnumeric():
        return int(cleaned_text.strip())
    else:
        return cleaned_text.strip()


def convert_to_dictionary(large_string):
    list_of_strings = [x for x in large_string.split('\n')]
    list_of_strings = [x for x in list_of_strings if x != '']
    list_of_strings = [list(x.split("=")) for x in list_of_strings]
    return {ind: [clean_text(x[0]), clean_text(x[1])] for ind, x
            in enumerate(list_of_strings)}


def flatten(list_of_lists):
    if len(list_of_lists) == 0:
        return list_of_lists
    if isinstance(list_of_lists[0], list):
        return flatten(list_of_lists[0]) + flatten(list_of_lists[1:])
    return list_of_lists[:1] + flatten(list_of_lists[1:])

with open('I94_SAS_Labels_Descriptions.SAS', 'r') as conn:
    description = conn.read()

# Split text into list of elements
description_list = description.split('\n\n')
# Next we remove further elements that are not useful and split the text into
# a list of strings.
description_list = [x for x in description_list if re.search('[ ]?value ', x)]
description_list = [re.sub(r'/\*[^\*]*\*/[\n]?', '', x)
                    for x in description_list]
description_list = [re.sub(r'[\s]?value.*\n', '', x) for x in description_list]
description_list = [re.sub('[\n| ];', '', x) for x in description_list]
description_list = [re.sub('\t', '', x) for x in description_list]

# =============================================================================
# Transform the individual tables and write to disk
# =============================================================================

# Carry out the split
countries, _, _, us_states = description_list

# Convert the different text elements to dataframes and clean them up
# and finally write them to disk
countries = convert_to_dictionary(countries)
countries = pd.DataFrame.from_dict(countries, orient='index',
                                columns=['country_code', 'country_name'])
countries = countries[~countries['country_name'].str.contains(
    "INVALID|No Country Code|Collapsed", case=False)]
countries['country_name'] = countries['country_name'].str.title() \
    .str.replace(r'\([^\(\)]*\)', '') \
    .str.strip()

countries.to_csv(os.path.join(os.getcwd(), 'meta_data/countries.csv'),
              index=False)

# countries = spark.createDataFrame(countries)
countries = spark \
    .read \
    .csv('meta_data/countries.csv', sep=',', header='true')

# Same again for the lookup-table for US states
us_states = convert_to_dictionary(us_states)
us_states = pd.DataFrame.from_dict(us_states, orient='index',
                                   columns=['state_code', 'state'])

us_states = us_states[(us_states['state_code'] != 99)]
us_states['state'] = us_states['state'].str.replace(r'N\.', 'NORTH') \
    .str.replace(r'S\.', 'SOUTH') \
    .str.replace(r'W\.', 'WEST') \
    .str.replace(r'DIST\.', 'DISTRICT') \
    .str.title()
us_states.to_csv('meta_data/us_states.csv', index=False)

# us_states = spark.createDataFrame(us_states)
us_states = spark \
    .read \
    .csv('meta_data/us_states.csv', sep=',', header='true')

# Clean up
del description, description_list, _, clean_text, convert_to_dictionary, flatten

The meta-data in the immigration meta-data file is not helpful to pin immigration ports to a certain city and this will need to be done by hand and then be ingested as a separate file. Doing this will also allow for the fact that some larger cities (notably New York City) have more than one international airport serving them.

In [None]:
ports_of_entry = spark \
    .read \
    .csv('port_city_lookup.csv', sep=',', header='true')

In [None]:
# =============================================================================
# Download lookup table of airlines codes and names
# =============================================================================
# import libraries
import requests
from bs4 import BeautifulSoup

# Define functions
def extract_data(response):
    html = response.text
    soup = BeautifulSoup(html, 'html5lib')
    table = soup.find("table")
    return pd.read_html(str(table))[0]

# Load data
airlines_url = 'https://www.bts.gov/topics/airlines-and-airports/airline-codes'

response = requests.get(airlines_url)
if response.status_code == 200:
    airline_codes = extract_data(response)

# Clean data
airline_codes = airline_codes \
    .rename(str.lower, axis='columns') \
    .rename({'airline': 'airline_name'}, axis='columns') \
    .loc[lambda df: ~df['code'].str.contains(r'\([0-9]\)', na=False), ]

# Write data to file
airline_codes.to_csv('meta_data/airlines.csv', index=False)

# Convert to PySpark dataframe
# airline_codes = spark.createDataFrame(airline_codes)
airline_codes = spark \
    .read \
    .csv('meta_data/airlines.csv', sep=',', header='true')

# Clean up
del airlines_url, response, extract_data

In [None]:
# =============================================================================
# Download shapefiles
# =============================================================================
# import libraries
from urllib.request import urlopen
import fiona
from shapely.geometry import shape, Point, Polygon
!mkdir shapefiles

# Download shape file for climate data
shapefile_url = 'https://www2.census.gov/geo/tiger/GENZ2019/shp/cb_2019_us_state_' \
       '500k.zip'
with urlopen(shapefile_url) as zipresp, NamedTemporaryFile() as tfile:
    tfile.write(zipresp.read())
    tfile.seek(0)
    unpack_archive(tfile.name, './shapefiles', format = 'zip')
    
del shapefile_url

state_polygons = fiona.open('shapefiles/cb_2019_us_state_500k.shp')

### Let's briefly look at the meta-data

#### Ports of entry lookup-table

In [None]:
ports_of_entry.show(5)

#### Countries of origin lookup-table

In [None]:
countries.show(5)

#### US state lookup_table

In [None]:
us_states.show(5)

#### Airlines lookup-table

In [None]:
airline_codes.show(5)

#### Geographic information on US states and territories

In [None]:
from IPython.display import Image
Image(filename='usa.png') 

The shapefile with the state and territories of the United States shown above 
can be used to determine the state in which a geographic point given in latitude-longitude
coordinates is located. Plotting this is currently not possible on the server as it 
requires larger system packages to be installed. So in this instance the image was
rendered in QGIS and then imported.

The meta-data looks all good. 

## Step 3: The Data Model
### 3.1 Conceptual Data Model
The data will be stored in a relational database and we therefore need to model how the different tables can be joined with each other and if there are any foreign-key relationships.

#### Structure of the final dataset

In [None]:
Image(filename='capstone_project.png') 

The final dataset will be made up of five tables:  
The __city_demographics__ table contains the city demographics such as population numbers by gender and ethnicity for each major city in the US.  
The __city_climate__ table contains the monthly average temperature for each city and is linked to the __city_demographics__ table by the _city_ and _state_ columns. However, they only partially overlap. 

The third table, __city_visitors__, contains the the monthly aggregated number of overseas visitors for each city and is connected to the __city_demographics__ table by the _city_ and _state_ columns and to the __city_climate__ table by _city_, _state_, _year_, and _month_ columns. But again these sets are not overlapping, as the climate dataset only goes to 2013 and the visitor dataset only contains the year 2016. Also the cities are only parially overlapping. However, assuming that this is a pipleine where both datasets will be available for each month, it makes sense to keep the dataset as a timeline rather than to take the temperature mean for each city. 

The __airlines__ table contains the full names of the airlines and the number of passengers per _city_+_state_+_year_+_month_ combination carried by an airlines. As it is derived from the same overseas visitors dataset these four columns directly overlap. 

Last but not least the __country_of_origin__ table contains the number of residents and citizen from each country visiting a city each month. Similar to the __airlines__ table it is derived from the overseas visitors dataset. 

As is easily discernable the common link between the five tables are the _city_ and _state_ columns. However, as the datasets did not contain all the same sets of cities there is only limited overlap. Four out of the five tables also have _year_ and _month_ columns but again only __city_visitors__, __airlines__ and __country_of_origin__ share the same time-range. However, there is still enough overlap of keys and information to gleen interesting insights, especially about the origin of visitors to the different cities, and which airlines are the biggest carriers to each city.

All the tables will already be denormalised and there is no formal fact-table-dimension-table relationship. However, the table can be joined to form more elaborate queries.

#### Treating the demographics table

Exploring the cities dataset showed that the table was structed as a 'stacked' table. As such several rows referred to the same city but contained the different population groups as separate rows. To make the table more easy to deal with we will have to 'unstack' it by carrying out a pivot operation.

### 3.2 The necessary data pipelines
To process the data with maximum performance we will first carry out the transformations on the local machine before loading the resulting files to Redshift. As the data is over 6GB it would not be sensible to carry out the aggregation at a later stage.
The aggregation will be carried out with PySpark with the aggregation columns being _state_, _city_, _year_, and _month_, _airline_code_ and _country_of_origin_ where they are available. This should reduce the size of the dataset considerably.
We then create an S3 bucket and a Redshift Cluster. We create the empty tables in the Redshift cluster, load the data to the 
S3 bucket and finally copy the data from the S3 bucket to the Redshift cluster. 

## Step 4: Run Pipelines to Model the Data 
### 4.1 Creating the data model
Now we run the code that will create the staging tables.

In [None]:
!mkdir staging

#### Load the full immigration dataset into Spark

In [None]:
immigration_path = '../../data/18-83510-I94-Data-2016'
immigration_files = os.listdir(immigration_path)
immigration_files = [os.path.join(immigration_path, f)
                     for f in immigration_files]

def union_all(dfs):
    if len(dfs) > 1:
        return dfs[0].unionAll(union_all(dfs[1:]))
    else:
        return dfs[0]

immigration = [spark.read.format('com.github.saurfang.sas.spark').load(f)
               for f in immigration_files]

keep_cols = ['i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate', 'i94mode',
             'depdate', 'i94bir', 'i94visa', 'gender', 'airline', 'admnum', 'count']

immigration = [df.select(*keep_cols) for df in immigration]
immigration = union_all(immigration)

#### Combine immigration data with the prepared meta data sets and write to disk

In [None]:
# =============================================================================
# Define functions
# =============================================================================
def cnt_cond(cond):
    return F.sum(F.when(cond, 1).otherwise(0))

# =============================================================================
# Define data transformations in Spark
# =============================================================================
immigration = immigration \
    .dropna(how='any', subset='i94mode') \
    .filter(F.col('i94mode') ==  1) \
    .dropDuplicates(['admnum']) \
    .withColumn('year', F.col('i94yr').cast(IntegerType())) \
    .withColumn('month', F.col('i94mon').cast(IntegerType())) \
    .withColumn('stay', F.abs(F.col('depdate') - F.col('arrdate'))) \
    .withColumn('stay', F.col('stay').cast(IntegerType())) \
    .withColumn('i94cit', F.col('i94cit').cast(IntegerType())) \
    .withColumn('i94res', F.col('i94res').cast(IntegerType())) \
    .join(F.broadcast(ports_of_entry), F.col('i94port') == ports_of_entry.port_code,
          how='inner') \
    .join(F.broadcast(us_states), on=['state_code'], how='inner') \
    .select('city', 'state', 'year', 'month', 'i94cit', 'i94res', 'i94bir',
            'i94visa', 'count', 'gender', 'stay', 'airline')

In [None]:
aggregated_immigration = immigration \
    .groupBy(['year', 'month', 'city', 'state']) \
    .agg(F.count('count').alias('total_count'),
         cnt_cond(F.col('gender') == "M").alias('male_count'),
         cnt_cond(F.col('gender') == "F").alias('female_count'),
         cnt_cond(F.col('i94visa') == 1).alias('business_visa_count'),
         cnt_cond(F.col('i94visa') == 2).alias('tourist_visa_count'),
         cnt_cond(F.col('i94visa') == 3).alias('student_visa_count'),
         F.expr('percentile(i94bir, array(0.5))')[0].alias('median_visitor_age'),
         F.expr('percentile(stay, array(0.5))')[0].alias('median_stay')) \
    .selectExpr('city', 'state', 'year', 'month', 'total_count', 'male_count',
            'female_count', 'business_visa_count', 'tourist_visa_count',
            'student_visa_count', 
            'round(median_visitor_age, 1) median_visitor_age',
            'round(median_stay, 2) median_stay')

In [None]:
airlines = immigration \
    .na.drop(subset=['airline']) \
    .withColumn('airline_code', F.regexp_replace(F.col('airline'), "\\*", "")) \
    .groupBy(['city', 'state', 'year', 'month', 'airline_code']) \
    .agg(F.count('count').alias('passenger_number')) \
    .join(F.broadcast(airline_codes), F.col('airline_code') == airline_codes.code, how='left') \
    .select('city', 'state', 'year', 'month', 'airline_code',
            'airline_name', 'passenger_number')

In [None]:
citizenship = immigration \
    .groupBy(['city', 'state', 'year', 'month', 'i94cit']) \
    .agg(F.count('count').alias('citizens_visiting')) \
    .join(F.broadcast(countries), F.col('i94cit') == countries.country_code, how='inner') \
    .select('city', 'state', 'year', 'month', 'country_name',
            'citizens_visiting')

residency = immigration \
    .groupBy(['city', 'state', 'year', 'month', 'i94res']) \
    .agg(F.count('count').alias('residents_visiting')) \
    .join(F.broadcast(countries), F.col('i94res') == countries.country_code, how='inner') \
    .select('city', 'state', 'year', 'month', 'country_name',
            'residents_visiting')

visitors_by_country = citizenship \
    .join(residency,  on=['year', 'month', 'city', 'state', 'country_name'],
          how='outer') \
    .na.drop(subset=['country_name']) \
    .fillna(0, subset=['citizens_visiting', 'residents_visiting']) \
    .select('city', 'state', 'year', 'month', 'country_name',
            'citizens_visiting', 'residents_visiting')

In [None]:
# =============================================================================
# Write Spark dataframes to disk
# =============================================================================
aggregated_immigration \
    .repartition('state') \
    .write \
    .csv('staging/immigration/',
         mode='overwrite')

del aggregated_immigration

In [None]:
airlines \
    .repartition('state') \
    .write \
    .csv('staging/airlines/',
         mode='overwrite')

del airlines, airline_codes

In [None]:
visitors_by_country \
    .repartition('state') \
    .write \
    .csv('staging/countries/',
         mode='overwrite')

del residency, citizenship, visitors_by_country, immigration

#### Transform and write demographic data to staging folder

In [None]:
# =============================================================================
# Sort out column names
# =============================================================================
# Column names in the demogrpahic data table is upper case with spaces and
# hyphens as word separators. We have to clean this before we can continue.
demographics_columns = demographics.columns
new_names = [re.sub("[ -]", "_", col).lower() for col in demographics_columns]

demographics = demographics.toDF(*new_names)

# =============================================================================
# Pivot and transform table
# =============================================================================
column_match = [re.match("race|count", city) is None
                for city in demographics.columns]

index_cols = [name for name, match in
              zip(demographics.columns, column_match) if match]

demographics = demographics.groupBy(index_cols) \
    .pivot("race") \
    .agg(F.sum('count')) \
    .drop('state_code') \
    .withColumnRenamed('median_age', 'median_population_age') \
    .dropna(how='any', subset=('state', 'city')) \
    .fillna(0) \
    .selectExpr('city', 'state',
                'cast(male_population as int) male_population',
                'cast(female_population as int) female_population',
                'cast(total_population as int) total_population',
                'cast(number_of_veterans as int) veterans',
                'cast(foreign_born as int) foreign_born',
                'cast(`American Indian and Alaska Native` as int) '
                'native_american',
                'cast(Asian as int) asian',
                'cast(`Black or African-American` as int) black',
                'cast(`Hispanic or Latino` as int) latino',
                'cast(White as int) white',
                'median_population_age', 'average_household_size',)


In [None]:
# =============================================================================
# Write Spark dataframe to disk
# =============================================================================  
demographics \
    .repartition('state') \
    .write \
    .csv('staging/demographics/',
         mode='overwrite')

del  demographics_columns, demographics, index_cols, new_names

#### Transform and stage climate data

The climate data needs the most work. The data is first aggregated before its combined with the shapefile data to extract the state from its latitude and longitude columns.

In [None]:
# =============================================================================
# Load shape file
# =============================================================================
state_polygons = fiona.open('shapefiles/cb_2019_us_state_500k.shp')

# =============================================================================
# Initial transformation of climate data
# =============================================================================
climate = climate \
    .toDF(*[c.lower() for c in climate.columns]) \
    .dropna(how='any') \
    .filter(F.col('country') == "United States") \
    .withColumn('dt', F.col('dt').cast(DateType())) \
    .withColumn('year', F.year(F.col('dt'))) \
    .withColumn('month', F.month(F.col('dt'))) \
    .filter(F.col('year') >= 1963)

city_coordinates = climate \
    .select('latitude', 'longitude') \
    .dropDuplicates() \
    .toPandas()

# =============================================================================
# Define transformation functions
# =============================================================================
def _convert_direction_to_plus_minus(coordinate):
    numeric_part = float(coordinate[:-1])
    direction = coordinate[-1].lower()
    converter = {"e": +1,
                 "n": +1,
                 "s": -1,
                 "w": -1}
    return numeric_part * converter[direction]


def _get_state_name_for_coordinates(features, coordinates):
    point = Point(coordinates[::-1])
    feature_dict = {feat['properties']['STUSPS']: shape(feat['geometry'])
                    for feat in features}
    for k, v in feature_dict.items():
        if v.contains(point):
            return k
    for k, v in feature_dict.items():
        if v == min(feature_dict.values(), key=point.distance):
            return k


def convert_coordinates_to_states(features, coordinates):
    coordinates = tuple(_convert_direction_to_plus_minus(item) for item in
                        coordinates)
    return _get_state_name_for_coordinates(features, coordinates)

In [None]:
# =============================================================================
# Retreive US state where city is located
# =============================================================================
states = [convert_coordinates_to_states(state_polygons, row) for row in
          zip(city_coordinates.loc[:, 'latitude'], 
              city_coordinates.loc[:, 'longitude'])]

city_coordinates['state_code'] = states

city_coordinates = spark.createDataFrame(city_coordinates)

In [None]:
# =============================================================================
# Combine climate data with state names
# =============================================================================
climate = climate \
    .join(city_coordinates, on=['latitude', 'longitude'], how='left') \
    .join(F.broadcast(us_states), on=['state_code'], how='inner') \
    .selectExpr('city', 'state', 'year', 'month',
                'round(averagetemperature, 2) average_temperature',
                'round(averagetemperatureuncertainty, 2)'
                'temperature_uncertainty')

In [None]:
# =============================================================================
# Write Spark dataframe to disk
# =============================================================================
climate \
    .repartition('state') \
    .write \
    .csv('staging/climate/',
         mode='overwrite')

del city_coordinates, climate, states, state_polygons, us_states

#### Check the aggregated data

Before uploading the data first to S3 and then to the Redshift cluster we will check that all of the data is in the right format and that there are no unexpected irregularities. So we will load the now much smaller aggregated tables back in an look for irregularities.

In [None]:
test_immigration  = spark \
    .read \
    .csv('staging/immigration/')

test_airlines  = spark \
    .read \
    .csv('staging/airlines/')

test_countries  = spark \
    .read \
    .csv('staging/countries/')

test_demographics  = spark \
    .read \
    .csv('staging/demographics/')

test_climate  = spark \
    .read \
    .csv('staging/climate/')

As the csv files have no header we do not expect to find any meaningful column names. For each table we get the first five rows, count the number of rows in the table and look for the number of missing values in each column.

In [None]:
test_immigration.show(5)

In [None]:
test_immigration.count()

In [None]:
test_immigration.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in test_immigration.columns]).show()

In [None]:
test_airlines.show(5)

In [None]:
test_airlines.count()

In [None]:
test_airlines.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in test_airlines.columns]).show()

In [None]:
test_countries.show(5)

In [None]:
test_countries.count()

In [None]:
test_countries.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in test_countries.columns]).show()

In [None]:
test_demographics.show(5)

In [None]:
test_demographics.count()

In [None]:
test_demographics.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in test_demographics.columns]).show()

In [None]:
test_climate.show(5)

In [None]:
test_climate.count()

In [None]:
test_climate.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in test_climate.columns]).show()

As we can see the columns that contain the primary keys do not contain missing values. That's great. It means we can move on. 

In [None]:
spark.stop()

### 4.2 Setting up AWS infrastructure

We will build the AWS from code directly in the pipeline. Any sensitive information is not stored inside the code but instead in a configuration file (`capstone.cfg`) that we parse into the pipeline.

In [None]:
import configparser
import psycopg2
import boto3
import os

config = configparser.ConfigParser()
config.read('capstone.cfg')


ACCESS_KEY             = config.get('AWS','ACCESS_KEY')
SECRET_KEY             = config.get('AWS','SECRET_KEY')
REGION                 = config.get('S3', 'REGION')
BUCKET_NAME            = config.get('S3', 'BUCKET_NAME')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

In [None]:
# =============================================================================
# Create Amazon resource connections
# =============================================================================
s3 = boto3.resource('s3',
                    region_name=REGION,
                    aws_access_key_id=ACCESS_KEY,
                    aws_secret_access_key=SECRET_KEY)
iam = boto3.client('iam',
                   region_name=REGION,
                   aws_access_key_id=ACCESS_KEY,
                   aws_secret_access_key=SECRET_KEY)

redshift = boto3.client('redshift',
                   region_name=REGION,
                   aws_access_key_id=ACCESS_KEY,
                   aws_secret_access_key=SECRET_KEY)

#### Upload data to S3 staging bucket

In [None]:
# =============================================================================
# Create S3 bucket
# =============================================================================
s3.create_bucket(Bucket=BUCKET_NAME,
                 CreateBucketConfiguration={'LocationConstraint': REGION})

In [None]:
# =============================================================================
# Define function to upload all files in a folder to S3
# =============================================================================
def upload_folders(directory_path, s3, bucket_name, prefix):
    for root, dirs, files in os.walk(directory_path):
            for file in files:
                if re.search(r'SUCCESS|\.crc$', file) is None:
                    local_file = os.path.join(root, file)
                    relative_path = os.path.relpath(local_file, directory_path)
                    s3_file = os.path.join(prefix, relative_path)
                    s3.meta.client.upload_file(local_file, bucket_name, s3_file)

In [None]:
# =============================================================================
# Upload files to S3
# =============================================================================
upload_folders('staging/immigration/', s3, BUCKET_NAME, 'staging/immigration/')

In [None]:
upload_folders('staging/airlines', s3, BUCKET_NAME, 'staging/airlines/')

In [None]:
upload_folders('staging/climate/', s3, BUCKET_NAME, 'staging/climate/')

In [None]:
upload_folders('staging/countries/', s3, BUCKET_NAME, 'staging/countries/')

In [None]:
upload_folders('staging/demographics', s3, BUCKET_NAME, 'staging/demographics/')

In [None]:
# Clean up the local staging folder
!rm -r staging

#### Copy staging tables into Redshift

In [None]:
# =============================================================================
# Create IAM role
# =============================================================================
import json
from botocore.exceptions import ClientError


try:
    print("1.1 Creating a new IAM Role") 
    dwhRole = iam.create_role(
        Path='/',
        RoleName=DWH_IAM_ROLE_NAME,
        Description = "Allows Redshift clusters to call AWS services on your behalf.",
        AssumeRolePolicyDocument=json.dumps(
            {'Statement': [{'Action': 'sts:AssumeRole',
               'Effect': 'Allow',
               'Principal': {'Service': 'redshift.amazonaws.com'}}],
             'Version': '2012-10-17'})
    )    
except Exception as e:
    print(e)
    
    
print("1.2 Attaching Policy")

iam.attach_role_policy(RoleName=DWH_IAM_ROLE_NAME,
                       PolicyArn="arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
                      )['ResponseMetadata']['HTTPStatusCode']

print("1.3 Get the IAM role ARN")
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']

print(roleArn)

In [None]:
# =============================================================================
# Create Redshift cluster
# =============================================================================
try:
    response = redshift.create_cluster(        
        #HW
        ClusterType=DWH_CLUSTER_TYPE,
        NodeType=DWH_NODE_TYPE,
        NumberOfNodes=int(DWH_NUM_NODES),

        #Identifiers & Credentials
        DBName=DWH_DB,
        ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
        MasterUsername=DWH_DB_USER,
        MasterUserPassword=DWH_DB_PASSWORD,
        
        #Roles (for s3 access)
        IamRoles=[roleArn]  
    )
except Exception as e:
    print(e)

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

The next block will run until `ClusterStatus` changes to 'available' before moving on.

In [None]:
import time 
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

while myClusterProps['ClusterStatus'] != 'available':
    time.sleep(30)
    prettyRedshiftProps(myClusterProps)
    myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]

prettyRedshiftProps(myClusterProps)
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

In [None]:
%load_ext sql

In [None]:
conn_string=f'postgresql://{DWH_DB_USER}:{DWH_DB_PASSWORD}@{DWH_ENDPOINT}:{DWH_PORT}/{DWH_DB}'
print(conn_string)
%sql $conn_string

In [None]:
# =============================================================================
# Create Redshift tables
# =============================================================================

In [None]:
%%sql

DROP TABLE IF EXISTS city_visitors;
CREATE TABLE IF NOT EXISTS city_visitors (
    city VARCHAR(100) NOT NULL,
    state VARCHAR(30) NOT NULL,
    year SMALLINT NOT NULL,
    month  SMALLINT NOT NULL,
    total_count INTEGER,
    male_count INTEGER,
    female_count INTEGER,
    business_visa_count INTEGER,
    tourist_visa_count INTEGER,
    student_visa_count INTEGER,
    median_visitor_age REAL,
    median_stay REAL,
    CONSTRAINT city_pkey PRIMARY KEY (city, state, year, month))
    DISTSTYLE KEY
    DISTKEY (state)
    COMPOUND SORTKEY (state, city, year, month);

    
DROP TABLE IF EXISTS airline;
CREATE TABLE IF NOT EXISTS airline (
    city VARCHAR(100) NOT NULL,
    state VARCHAR(30) NOT NULL,
    year SMALLINT NOT NULL,
    month  SMALLINT NOT NULL,
    airline_code VARCHAR(3) NOT NULL,
    airline_name VARCHAR(256),
    passenger_number INTEGER,
    CONSTRAINT airline_pkey PRIMARY KEY (city, state, year, month, airline_code))
    DISTSTYLE KEY
    DISTKEY (state)
    COMPOUND SORTKEY (state, city, year, month, airline_code);


DROP TABLE IF EXISTS country_of_origin;
CREATE TABLE IF NOT EXISTS country_of_origin (
    city VARCHAR(100) NOT NULL,
    state VARCHAR(30) NOT NULL,
    year SMALLINT NOT NULL,
    month  SMALLINT NOT NULL,
    country_name VARCHAR(35) NOT NULL,
    citizens_visiting INTEGER,
    residents_visiting INTEGER,
    CONSTRAINT country_pkey PRIMARY KEY (city, state, year, month, country_name))
    DISTSTYLE KEY
    DISTKEY (state)
    COMPOUND SORTKEY (state, city, year, month, country_name);


DROP TABLE IF EXISTS city_demographics;
CREATE TABLE IF NOT EXISTS city_demographics (
    city VARCHAR(100) NOT NULL,
    state VARCHAR(30) NOT NULL,
    total_population INTEGER,
    male_population INTEGER,
    female_population INTEGER,
    veterans INTEGER,
    foreign_born INTEGER,
    asian INTEGER,
    black INTEGER,
    latino INTEGER,
    native_american INTEGER,
    white INTEGER,
    median_population_age REAL,
    average_houshold_size REAL,
    CONSTRAINT demographics_pkey PRIMARY KEY (city, state))
    DISTSTYLE ALL
    COMPOUND SORTKEY (state, city);


DROP TABLE IF EXISTS city_climate;
CREATE TABLE IF NOT EXISTS city_climate (
    city VARCHAR(100) NOT NULL,
    state VARCHAR(30) NOT NULL,
    year SMALLINT NOT NULL,
    month  SMALLINT NOT NULL,
    average_temperature REAL,
    temperature_uncertainty REAL,
    CONSTRAINT climate_pkey PRIMARY KEY (city, state, year, month))
    DISTSTYLE ALL
    COMPOUND SORTKEY (state, city, year, month);

### Load staging tables to Redshift

In [None]:
# Let's define a function to make this more convenient
def copy_staging_table(target_table, bucket, prefix, iam_arn):
    full_source_path = os.path.join(bucket, prefix)
    return f"""COPY {target_table}
            FROM \'{full_source_path}\'
            IAM_ROLE \'{iam_arn}\'
            FORMAT AS CSV;"""


bucket_url = f's3://{BUCKET_NAME}/'

# Now we create the sql-command strings that we will feed to Redshift
copy_immigration = copy_staging_table('city_visitors',
                                         bucket_url,
                                         'staging/immigration/',
                                         DWH_ROLE_ARN)

copy_climate = copy_staging_table('city_climate',
                                  bucket_url,
                                  'staging/climate/',
                                  DWH_ROLE_ARN)

copy_demographics = copy_staging_table('city_demographics',
                                       bucket_url,
                                       'staging/demographics/',
                                       DWH_ROLE_ARN)

copy_airlines = copy_staging_table('airline',
                                   bucket_url,
                                   'staging/airlines/',
                                   DWH_ROLE_ARN)

copy_countries = copy_staging_table('country_of_origin',
                                    bucket_url,
                                    'staging/countries/',
                                    DWH_ROLE_ARN)


In [None]:
# =============================================================================
# Send the copy-commands to Redshift for execution
# =============================================================================
%sql $copy_immigration

In [None]:
%sql $copy_climate

In [None]:
%sql $copy_demographics

In [None]:
%sql $copy_airlines

In [None]:
%sql $copy_countries

#### To clean up we delete the folder containing the staging data on S3

In [None]:
# First we create a bucket instance
staging_bucket = s3.Bucket(BUCKET_NAME, )

# Then we can use the instance to delete folders
_ = staging_bucket.objects.filter(Prefix='staging').delete()

#### 4.2 Data Quality Checks

We shall briefly run a last set of checks on the database tables.  
We start by checking that all keys are unique by checking that the number of rows equals the number of unique compound primary key values.

In [None]:
primary_keys = {'city_visitors': 'city, state, year, month',
                'airline':  'city, state, year, month, airline_code',
                'country_of_origin': 'city, state, year, month, country_name',
                'city_climate': 'city, state, year, month',
                'city_demographics' : 'city, state'}

In [None]:
primary_key_checks = {}
for k, v in  primary_keys.items():
    all_rows = %sql SELECT COUNT(*) FROM {k};
    unique_rows = %sql SELECT COUNT(*) FROM (SELECT DISTINCT {v} FROM {k});
    if all_rows==unique_rows:
        print('==========================================================\n'
              f'All primary keys in table {k} are unique!!!\n'
              f'There are {all_rows[0][0]} records in this table.\n'
              '==========================================================\n')
    else:
         print('==========================================================\n'
              f'ERROR!!. Primary keys in table {k} are NOT unique!!!\n'
              f'There are {all_rows[0][0]} records in this table.\n'
              f'{unique_rows[0][0]} records are unique.\n'
              '==========================================================')

So we can see that all primary keys are unique.  
Finally we have a brief look at the tables to ensure that the data is not garbled. 

In [None]:
%sql SELECT * FROM city_visitors LIMIT 5;

In [None]:
%sql SELECT * FROM airline ORDER BY passenger_number DESC LIMIT 5;

In [None]:
%sql SELECT * FROM country_of_origin ORDER BY residents_visiting DESC LIMIT 5;

In [None]:
%sql SELECT * FROM city_climate LIMIT 5;

In [None]:
%sql SELECT * FROM city_demographics LIMIT 5;

That looks all good as well. That means the pipeline is working!

### 4.3 Data dictionary 

__city_visitors__ table

Table describing the monthly statistics for various overseas non-immigrant visitors arriving by air traffic arrving in certain US cities.

| column name        | description                                   |
| :----------------- | :-------------------------------------------- |
| city               | Name of US city                               |
| state              | Name of US state                              |
| year               | Year as 4 digit integer                       |
| month              | Month as 2 digit integer                      |
| total_count        | Total number of visitors                      |
| male_count         | Number of male visitors                       |
| female_count       | Number of female visitors                     |
|business_visa_count | Number of visitors on a business visa         |
|tourist_visa_count  | Number of visitors on a tourist visa          |
|student_visa_count  | Number of visitors on a student visa          |
|median_visitor_age  | Median age of all visitors in years           |
|median_stay         | Median length of stay of all visitors in days |


__airline__ table

Table describing the airlines and passenger numbers transported by each airline for certain US city per calendar month.

| column name        | description                                             |
| :----------------- | :------------------------------------------------------ |
| city               | Name of US city                                         |
| state              | Name of US state                                        |
| year               | Year as 4 digit integer                                 |
| month              | Month as 2 digit integer                                |
| airline_code       | 2 or 3 alpha- numeric code given to each airline by ICE |
| airline_name       | Full name of airline when available                     |
| passenger_number   | Number of passengers arriving in a city by this airline |


__country_of_origin__ table

Table describing statistics on the citizenship and current country of residence of non-immigrant international visitors to a certain US city per calendar month.

| column name        | description                                                         |
| :----------------- | :------------------------------------------------------------------ |
| city               | Name of US city                                                     |
| state              | Name of US state                                                    |
| year               | Year as 4 digit integer                                             |
| month              | Month as 2 digit integer                                            |
| country_name       | Full name of country                                                |
| citizens_visiting  | Number of citizen from this particular country visiting the US city |
| residents_visiting | Number of residents of this particular country visiting the US city |


__city_demographics__ table

Table describing the total population size, gender and racial make-up of US cities including social statistics.

| column name            | description                                          |
| :--------------------- | :--------------------------------------------------- |
| city                   | Name of US city                                      |
| state                  | Name of US state                                     |
| total_population       | Total number of city inhabitants living in the city  |
| male_population        | Number of male inhabitants living in the city        |
| female_population      | Number of female inhabitants living in the city      |
| veterans               | Number of veterans living in the city                |
| foreign_born           | Number of foreign born people living in the city     |
| asian                  | Number of people of Asian origin living in the city  |
| black                  | Number of black people living in the city            |
| latino                 | Number of people of Latino origin living in the city |
| native_american        | Number of native american people living in the city  |
| white                  | Number of white people living in the city            |
| median_population_age  | Median age of city inhabintants in years             |
| average_household_size | Average number of people per household               |  


__city_climate__ table

Table describing historic temperature trends for certain US cities.

| column name             | description                                          |
| :---------------------- | :--------------------------------------------------- |
| city                    | Name of US city                                      |
| state                   | Name of US state                                     |
| year                    | Year as 4 digit integer                              |
| month                   | Month as 2 digit integer                             |
| average_temperature     | Average monthly temperature in degree Celsius        |
| temperature_uncertainty | Uncertainty of monthly temperature in degree Celsius |

## 5 Final thoughts on the project

### 5.1 Choice of tools and technologies

The reason we used PySpark was that we needed to aggregate a very large amount of data before being able to send it to the Redshift database.
Pandas would not be able to handle a dataset that is 6GB in size.  
Uploading to S3 was necessary as it is the easiest way to stage the aggregated data for copying to Redshift.  
Finally, Redshift, is a good choice when hosting a relational database in the cloud, as it allows for later resizing when the volume of data or queries increases without having to migrate the data to a new database. 

Since the climate and immigration data is naturally aggregated every month it makes sense to run the pipeline on a monthly basis. However, this should be done with some lag time as the median stay has to be calculated from the departure date which will be some time after the arrival date.

### 5.2 Possible changes in the technological approach to future challenges

Whilst the purpose of this project is a case study there are various ways to adjust this pipeline for future challenges. 
If the input of data was suddenly increase in volume by, say 100 times, it would make most sense to host the PySpark step on an Amazon EMR cluster that can be appropriately sized to fit the increased workload. This would most likely also increase the speed of transfer of the parquet files to the S3 staging bucket. The S3 bucket and the Redshift cluster itself will not need resizing as the aggregated data will still be very small after processing with Spark.
In case that the pipeline has to be run everyday in the early morning, at for example 7am, it would make more sense to build out the pipeline in Airflow that makes it easier to automate the process.  
In case of increased user access to the database (say by 100 times) the Redshift cluster needs to be increased in size to cope with the additional queries. 