# Udacity Nanodegree: Data Engineering - Capstone Project
## ETL Pipeline for 2016 US Immigration Data

### Project Summary
This project describes and implements an ETL pipeline to create a database with a star schema representing data concerning immigration into the US in 2016.

The pipeline, which runs on Apache Spark, ingests data of disparate types (including CSV and SAS binary) and outputs tables of cleaned and verified data into carefully modelled schemas in parquet files written to S3, as well as to the local disk.

This allows BI and Data Scientist consumers to ingest and analyse the parquet-formatted data using the Spark cluster that performed the ETL, or using any other tool of their choice.

In [1]:
import configparser
from datetime import datetime, timedelta
from itertools import chain
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import DateType, FloatType, IntegerType, TimestampType
import pandas as pd
import re

import warnings
warnings.filterwarnings('ignore')

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['KEY']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['SECRET']

In [3]:
spark = SparkSession.builder\
    .config('spark.jars.packages','saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0') \
    .enableHiveSupport() \
    .getOrCreate()

## 1: Scope the Project and Gather Data

### 1.1: Scope 
In this project, I will be transforming various disparate raw sources of data to create a clean, star-schema database stored in a cloud-based data lake to allow  analysis of data concerning people immigrating to the US using I-94 forms.

I will be using [Apache Spark](https://spark.apache.org/) as my data-processing engine, writing [Parquet](https://parquet.apache.org/) files to [AWS S3](https://aws.amazon.com/s3/).

### 1.2: Describe and Gather Data 

Overview:
1. **I-94 Immigration Data**: this data comes from the [US National Tourism and Trade Office](https://travel.trade.gov/research/reports/i94/historical/2016.html), and has a row for every instance of a person immigrating to the US, showing information about each immigration (e.g., departure and arrival locations, visa type, etc.) 
  1. the core immigration data is a set of SAS binary files that are loaded onto an attached disk to the Spark Cluster on which this notebook is run - each file is of the form `i94_<month>16_sub.sas7bdat`
  2. an ancillary SAS file (`source/I94_SAS_Labels_Description.SAS`) contains schema information about the core data
2. **World Temperature Data**: this data comes from [Kaggle](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) and shows the average temperature per city per month
3. **U.S. City Demographic Data**: this data comes from [OpenSoft](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/) and shows the demographics (e.g., population by gender, ethnicity etc.) of US cities
4. **Airport Data**: this data comes from [datahub.io](https://datahub.io/core/airport-codes#data) and is a simple table of airport codes and corresponding cities
5. **ISO-3166 Countries Data**: this data comes (copy-pasted) from [IBAN](https://www.iban.com/country-codes) and shows the ISO 3166 codes for countries
6. **US State Names Data**: this data comes (copy-pasted) from [yourdictionary.com](https://abbreviations.yourdictionary.com/articles/state-abbrev.html) and shows the standard names and abbreviations for the 50 US States

Below, each data source is imported, the first 5 rows of each source printed, and a data dictionary (using the results of `.printSchema()` along with with best-attempt field descriptions from the data sources) displayed.

### 1.2.1: I-94 Immigration Data

In [4]:
# Load I-94 immigration data subset (April 2016) into pandas for initial data exploration
fname_i94immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_spark_i94immigration_raw = spark.read.format('com.github.saurfang.sas.spark') \
    .load(fname_i94immigration)

In [5]:
df_spark_i94immigration_raw.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


Data dictionary:

| Column   | Type   | Description |
| -------- | ------ | ----------- |
| cicid    | double | ID of Immigration |
| i94yr    | double | Year of immigration (as: 4 digit e.g., 2020) |
| i94mon   | double | Month of immigration (as: numeric, 1-based since April is 4.0) |
| i94cit   | double | Country of citizenship (as: 3 digit code) |
| i94res   | double | Country of citizenship (as: 3 digit code) |
| i94port  | string | Port of entry (as: 3 letter code)
| arrdate  | double | Date of arrival (as: seconds since 1/1/1960) |
| i94mode  | double | Mode of travel (as: 1 = Air, 2 = Sea, 3 = Land, 9 = Not Reported) |
| i94addr  | string | State of entry (as: 2 letter code, e.g., CA for California) |
| depdate  | double | Date of arrival (as: seconds since 1/1/1960) |
| i94bir   | double | Age (in years) |
| i94visa  | double | Visa code (as: 1 = Business, 2 = Pleasure, 3 = Student) |
| count    | double | Used for summary statistics |
| dtadfile | string | Character Date Field - Date added to I-94 Files |
| visapost | string | Department of State where where Visa was issued |
| occup    | string | Occupation that will be performed in U.S. |
| entdepa  | string | Arrival Flag - admitted or paroled into the U.S. |
| entdepd  | string | Departure Flag - Departed, lost I-94 or is deceased |
| entdepu  | string | Update Flag - Either apprehended, overstayed, adjusted to perm residence |
| matflag  | string | Match flag - Match of arrival and departure records |
| biryear  | double | Year of birth (as: 4 digits) |
| dtaddto  | string | Character Date Field - Date to which admitted to U.S. (allowed to stay until) |
| gender   | string | Non-immigrant sex |
| insnum   | string | INS number |
| airline  | string | Airline used to arrive in U.S. |
| admnum   | double | Admission Number |
| fltno    | string | Flight number of Airline used to arrive in U.S. |
| visatype | string | Class of admission legally admitting the non-immigrant to temporarily stay in U.S. |

#### 1.2.2: World Temperature Data

In [6]:
fname_worldtemperature = '../../data2/GlobalLandTemperaturesByCity.csv'
df_spark_worldtemperature_raw = spark.read.format('csv') \
    .option('header', 'true').option('sep', ',') \
    .load(fname_worldtemperature)

In [7]:
df_spark_worldtemperature_raw.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


Data dictionary:

| Column                        | Type   | Description |
| ----------------------------- | ------ | ----------- |
| dt                            | string | Date of record, always the first of a month (as: YYYY-MM-DD) |
| AverageTemperature            | string | Average temperature of city for that month (as: Celsius) |
| AverageTemperatureUncertainty | string | The 95% confidence interval around the average
| City                          | string | City |
| Country                       | string | Country of the city |
| Latitude                      | string | Latitude of the city (as: \<decimal degrees north\>N ) |
| Longitude                     | string | Longitude of the city (as: \<decimal degrees east\>E ) |

#### 1.2.3: U.S. City Demographic Data

In [8]:
fname_uscitydemographic = 'source/us-cities-demographics.csv'
df_spark_uscitydemographic_raw = spark.read.format('csv') \
    .option('header', 'true') \
    .option('sep', ';') \
    .load(fname_uscitydemographic)

In [9]:
df_spark_uscitydemographic_raw.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


Data dictionary:

| Column                 | Type   | Description |
| ---------------------- | ------ | ----------- |
| City                   | string | City |
| State                  | string | State in which the city is located |
| Median Age             | string | Median age of residents of the city (as: years) |
| Male Population        | string | Count of males in the city |
| Female Population      | string | Count of females in the city |
| Total Population       | string | Count of people in the city |
| Number of Veterans     | string | Count of veterans in the city |
| Foreign-born           | string | Count of foreign-born people in the city |
| Average Household Size | string | Mean household size in the city |
| State Code             | string | State in which the city is located (as: 2 letter code) |
| Race                   | string | A race (e.g., "Hispanic or Latino") |
| Count                  | string | The number of people of the given race in the city |

#### 1.2.4: Airport Data

In [10]:
fname_airport = 'source/airport-codes_csv.csv'
df_spark_airport_raw = spark.read.format('csv') \
    .option('header', 'true').option('sep', ',') \
    .load(fname_airport)

In [11]:
df_spark_airport_raw.limit(5).toPandas()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


Data dictionary:

| Column       | Type   | Description |
| ------------ | ------ | ----------- |
| ident        | string | Airport code (as: ICAO or IATA format) |
| type         | string | Type of airport (as one of: large_airport, balloonport, seaplane_base, heliport, closed, medium_airport, small_airport) |
| name         | string | Name of airport |
| elevation_ft | string | Elevation of airport, in feet (as: feet) |
| continent    | string | Continent in which the airport is located (as one of: NA, SA, AS, AN, OC, EU, AF) |
| iso_country  | string | Country in which the airport is located (as: ISO-3166 format) |
| iso_region   | string | Region in which the airport is located (as: ISO-3166 format) |
| municipality | string | City in which the airport is located |
| gps_code     | string | GPS code of the airport |
| iata_code    | string | Airport code (as: IATA format)
| local_code   | string | Local code of the airport |
| coordinates  | string | Coordinates of the airport (as: \<decimal degrees latitude\>, \<decimal degrees longitude\>)

This data will not be used further in the project, as there are no columns that can be joined with any of the other tables. (Note: `i94port` in the _I-94 Immigration Data_ does **not** correspond to either `ident` or `iata_code` in the _Airport Data_.)

#### 1.2.5: ISO-3166 Countries Data

In [12]:
fname_iso3166countries = 'source/iso-3166-countries.csv'
df_spark_iso3166countries_raw = spark.read.format('csv') \
    .option('header', 'true').option('sep', '\t') \
    .load(fname_iso3166countries)

In [13]:
df_spark_iso3166countries_raw.limit(5).toPandas()

Unnamed: 0,Country,Alpha-2 code,Alpha-3 code,Numeric
0,Afghanistan,AF,AFG,4
1,Albania,AL,ALB,8
2,Algeria,DZ,DZA,12
3,American Samoa,AS,ASM,16
4,Andorra,AD,AND,20


Data dictionary:

| Column       | Type   | Description |
| ------------ | ------ | ----------- |
| Country      | string | Country name |
| Alpha-2 code | string | Country code (as: 2 character code) |
| Alpha-3 code | string | Country code (as: 3 character code) |
| Numeric      | string | Country code (as: 3 digits) |

#### 1.2.6: US States Names Data

In [14]:
fname_usstatesnames = 'source/us-states.csv'
df_spark_usstatesnames_raw = spark.read.format('csv') \
    .option('header', 'true').option('sep', '-') \
    .load(fname_usstatesnames)

In [15]:
df_spark_usstatesnames_raw.limit(5).toPandas()

Unnamed: 0,Name,Code
0,Alabama,AL
1,Alaska,AK
2,Arizona,AZ
3,Arkansas,AR
4,California,CA


Data dictionary:

| Column | Type   | Description |
| ------ | ------ | ----------- |
| Name   | string | State name |
| Code   | string | State code (as: 2 character code) |

## 2: Explore and Assess the Data
### 2.1: Explore the Data 

#### 2.1.1: I-94 Immigration Data

In [16]:
df_spark_i94immigration = df_spark_i94immigration_raw

The _I-94 Immigration_ does not contain any duplicate rows:

In [17]:
df_spark_i94immigration.count() == df_spark_i94immigration.dropDuplicates().count()

True

However, some columns have:
1. definitions/values that are not well explained/understood
 1. the meanings of the `count`, `dtadfile`, `visapost`, `occup`, `entdepa`, `entdepd`, `entdepu`, `matflag`, `dtaddto`, `insnum`, `admnum` are not well explained/understood, so these columns should be removed
2. unsuitable types
 1. integer: the columns `cicid`, `i94yr`, `i94mon`, `i94mode`, `i94bir`, `i94visa`, `biryear` should be of `IntegerType`
 2. date: the columns `arrdate`, `depdate` should be of `DateType`
 3. string (enum): the columns `i94mode` and `i94visa` should be of `StringType` (using their enumerated values from `source/I94__SAS_Labels_Description.SAS`)
3. non-standard formats
 1. iso-3166: the columns `i94cit` and `i94res` are in a non-standard format, and should be coverted to `ISO-3166: Alpha-3` format
4. invalid values
 1. states: the column `i94addr` has values that are not valid US states
5. unsemantic names
 1. many of the columns have names that are hard to understand
6. redundant data
 1. the `i94yr` and `i94mon` columns are made redundant by the `arrdate` column

#### 2.1.2: World Temperature Data

In [18]:
df_spark_worldtemperature = df_spark_worldtemperature_raw

The _World Temperature Data_ does not contain any duplicate rows:

In [19]:
df_spark_worldtemperature.count() == df_spark_worldtemperature.dropDuplicates().count()

True

However, some columns have:
1. unsuitable types
 1. integer: the columns `AverageTemperature`, `AverageTemperatureUncertainty` should be of IntegerType
 2. date: the column `dt` should be of DateType
2. non-standard formats
 1. iso-3166: the column `Country` is in a non-standard format, and should be coverted to `ISO-3166: Alpha-3`
3. unsemantic names
 1. the column naming is not always clear (e.g., `dt` means `Date`)
4. out-of-range data
 1. the date range of this data is 1743-2013, so we cannot match this data with the core immigration dataset by date (since it is 2016); therefore, an average over time (e.g., average over the 20 year period from 1993-2013) per city is suitable

#### 2.1.3: U.S. City Demographic Data

In [20]:
df_spark_uscitydemographic = df_spark_uscitydemographic_raw

The _U.S. City Demographic Data_ does not contain any duplicate rows:

In [21]:
df_spark_uscitydemographic.count() == df_spark_uscitydemographic.dropDuplicates().count()

True

However, some columns have:
1. unsuitable types
 1. integer: the columns `Male Population`, `Female Population`, `Total Population`, `Number of Veterans`, `Foreign-born`, `Count` should be of `IntegerType`
 2. float: the columns `Median Age`, `Average Household Size`  should be of `DateType`
2. unsemantic names
 1. the naming convention is not consistent across the columns
3. redundant data
  1. every row for a given city has identical data in all columns except `Race` and `Count`, so the table can be pivoted to have only one row per city and a column containing the population count for each race

#### 2.1.4: ISO-3166 Countries Data

In [22]:
df_spark_iso3166countries = df_spark_iso3166countries_raw

The _ISO-3166 Countries Data_ does not contain any duplicate rows:

In [23]:
df_spark_iso3166countries.count() == df_spark_iso3166countries.dropDuplicates().count()

True

or any other issues

#### 2.1.5: US States Names Data

In [24]:
df_spark_usstatesnames = df_spark_usstatesnames_raw

The _US States Names_ does not contain any duplicate rows:

In [25]:
df_spark_usstatesnames.count() == df_spark_usstatesnames.dropDuplicates().count()

True

### 2.2: Cleaning the Data

#### 2.2.1: I-94 Immigration Data

##### 2.2.1.1: Definitions/values that are not well explained/understood

In [26]:
unneeded_columns = [
    'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu', 'matflag', 'dtaddto', 'insnum', 'admnum',
]
df_spark_i94immigration = df_spark_i94immigration.drop(*unneeded_columns)

##### 2.2.1.2: Unsuitable types

In [27]:
# columns that should be IntegerType
integer_type_columns = [
    'cicid', 'i94yr', 'i94mon', 'i94mode', 'i94bir', 'i94visa', 'biryear',
    'i94cit', 'i94res', # note - these two will be further converted in 2.2.1.3
]
for column in df_spark_i94immigration.columns:
    if column in integer_type_columns:
        df_spark_i94immigration = df_spark_i94immigration \
            .withColumn(column, df_spark_i94immigration[column].cast(IntegerType()))

# columns that should be DateType
udf_date_from_sas = F.udf(lambda x: x if x is None else (datetime(1960, 1, 1) + timedelta(days=x)), DateType())
date_type_columns = [
    'arrdate', 'depdate',
]
for column in df_spark_i94immigration.columns:
    if column in date_type_columns:
        df_spark_i94immigration = df_spark_i94immigration \
            .withColumn(column, udf_date_from_sas(df_spark_i94immigration[column]))

# columns that should be string enums (for readability)
mapping_i94mode_expr = F.create_map(
    [F.lit(x) for x in chain(*({ 1: 'Air', 2: 'Sea', 3: 'Land', 9: 'Not Reported' }).items())]
)
mapping_i94visa_expr = F.create_map(
    [F.lit(x) for x in chain(*({ 1: 'Business', 2: 'Pleasure', 3: 'Student' }).items())]
)

df_spark_i94immigration = df_spark_i94immigration \
    .withColumn('i94mode', mapping_i94mode_expr.getItem(F.col('i94mode'))) \
    .withColumn('i94visa', mapping_i94visa_expr.getItem(F.col('i94visa')))

##### 2.2.1.3: Non-standard formats

In [28]:
## transform i94cit and i94res columns into the ISO-3166 Alpha-3 format (in steps (a), (b) and (c) below)

# a)
# create mapping from: I94 name -> I94 code i94cit/i94res -> country name
# (where I94 code is the format of the i94cit/i94res columns)
# by parsing source/I94_SAS_Labels_Descriptions

fname_saslabels_raw = 'source/I94_SAS_Labels_Descriptions.SAS'
df_spark_saslabels_raw = spark.read.text(fname_saslabels_raw)

data = []
has_seen_value_i94cntyl = False
for row in df_spark_saslabels_raw.collect():
    value = row.value
    if (not has_seen_value_i94cntyl) and ('i94cntyl' in value):
        has_seen_value_i94cntyl = True
        continue
    if has_seen_value_i94cntyl and (not value.strip()):
        break
    if has_seen_value_i94cntyl:
        data.append((
            value.split('=')[1].strip().strip(';').strip().strip('\'').strip(),
            value.split('=')[0].strip()
        ))
df_spark_i94countries = spark.createDataFrame(data, ['I94 name', 'I94 code'])
df_spark_i94countries = df_spark_i94countries \
    .withColumn('I94 code', df_spark_i94countries['I94 code'].cast(IntegerType()))

df_spark_i94countries.limit(5).toPandas()

Unnamed: 0,I94 name,I94 code
0,"MEXICO Air Sea, and Not Reported (I-94, no lan...",582
1,AFGHANISTAN,236
2,ALBANIA,101
3,ALGERIA,316
4,ANDORRA,102


In [29]:
# b)
# create mapping from: I94 code -> Alpha-3 code
# (where I94 code is the format of the i94cit/i94res columns, and Alpha-3 code is an ISO-3166 standard)

# i)
# create a 'Country (normalized)' column in df_spark_iso3166countries to more closely resemble the format 
# of the 'i94_country_name' column in df_spark_i94countries
df_spark_iso3166countries = df_spark_iso3166countries \
    .withColumn('Country (normalized)', F.upper(df_spark_iso3166countries['Country']))
df_spark_iso3166countries = df_spark_iso3166countries \
    .withColumn('Country (normalized)', F.regexp_replace(df_spark_iso3166countries['Country (normalized)'], '[\(\[].*?[\)\]]', ''))
df_spark_iso3166countries = df_spark_iso3166countries \
    .withColumn('Country (normalized)', F.regexp_replace(df_spark_iso3166countries['Country (normalized)'], '[ \t]+$', ''))

# ii)
# perform manual replacements on countries that haven't found a match
# the manual task of populating this list (of 58 countries) can be completed by comparing:
#  the value in the 'Country' column of the rows in df_spark_country_map that have i94_code None, with
#  the closest corresponding value in the 'I94_country_name' column in df_spark_i94countries
manual_replace_list = [
    ('ANTIGUA AND BARBUDA', 'ANTIGUA-BARBUDA'),
    # ...
    ('CHINA', 'CHINA, PRC'),
    # ...
    ('MEXICO', 'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'),
    # ...
    ('UNITED KINGDOM OF GREAT BRITAIN AND NORTHERN IRELAND', 'UNITED KINGDOM'),
    # ... etc...
]
for k,v in manual_replace_list:
    df_spark_iso3166countries = df_spark_iso3166countries.replace(k, v, 'Country (normalized)')

# iii)
# join df_spark_iso3166countries and df_spark_i94countries
df_spark_countries = df_spark_iso3166countries.join(
    df_spark_i94countries,
    df_spark_iso3166countries['Country (normalized)'] == df_spark_i94countries['I94 name'],
    how='left',
)

df_spark_countries.select('Country', 'Alpha-3 code', 'I94 code').sort(F.asc('Alpha-3 code')).limit(4).toPandas()

Unnamed: 0,Country,Alpha-3 code,I94 code
0,Aruba,ABW,532
1,Afghanistan,AFG,236
2,Angola,AGO,324
3,Anguilla,AIA,529


In [30]:
# c)
# replace i94cit and i94res df_spark_i94immigration with ISO-3166 Alpha-3 codes
i94immigration_column_names = df_spark_i94immigration.columns

def replace_i94_code_with_iso3166(
    column_name,
):
    global df_spark_i94immigration
    global df_spark_countries
    df_spark_i94immigration = df_spark_i94immigration.join(
        df_spark_countries,
        df_spark_i94immigration[column_name] == df_spark_countries['I94 code'],
        how='left'
    ).drop(
        column_name
    ).withColumnRenamed(
        'Alpha-3 code', column_name
    ).select(*i94immigration_column_names)

replace_i94_code_with_iso3166('i94cit')
replace_i94_code_with_iso3166('i94res')

df_spark_i94immigration = df_spark_i94immigration.sort(F.asc('cicid'))

df_spark_i94immigration.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,biryear,gender,airline,fltno,visatype
0,6,2016,4,ECU,ECU,XXX,2016-04-29,,,,37,Pleasure,1979,,,,B2
1,7,2016,4,,,ATL,2016-04-07,Air,AL,,25,Student,1991,M,,296.0,F1
2,15,2016,4,ALB,ALB,WAS,2016-04-01,Air,MI,2016-08-25,55,Pleasure,1961,M,OS,93.0,B2
3,16,2016,4,ALB,ALB,NYC,2016-04-01,Air,MA,2016-04-23,28,Pleasure,1988,,AA,199.0,B2
4,17,2016,4,ALB,ALB,NYC,2016-04-01,Air,MA,2016-04-23,4,Pleasure,2012,,AA,199.0,B2


##### 2.2.1.4: Invalid values

In [31]:
# set the value of i94addr to None if it is not a valid US state
states = df_spark_usstatesnames.select('Code').toPandas()['Code']
df_spark_i94immigration = df_spark_i94immigration.withColumn(
    'i94addr',
    F.when(
        F.col('i94addr').isin(*states),
        F.col('i94addr')
    ).otherwise(None)
)

##### 2.2.1.5: Unsemantic names

In [32]:
column_rename = [
    ('cicid', 'Id'),
    ('i94yr', 'Year'),
    ('i94mon', 'Month'),
    ('i94cit', 'Country Citizenship)'),
    ('i94res', 'Country Residence)'),
    ('i94port', 'Port Of Arrival'),
    ('arrdate', 'Date Of Arrival'),
    ('i94mode', 'Mode Of Travel'),
    ('i94addr', 'State Of Arrival'),
    ('depdate', 'Date Of Departure'),
    ('i94bir', 'Age'),
    ('i94visa', 'Visa Category'),
    ('biryear', 'Year Of Birth'),
    ('gender', 'Gender'),
    ('airline', 'Airline'),
    ('fltno', 'Flight Number'),
    ('visatype', 'Visa Type'),
]

for k,v in column_rename:
    df_spark_i94immigration = df_spark_i94immigration.withColumnRenamed(k, v)

##### 2.2.1.6: Redundant columns

In [33]:
redundant_columns = ['Year', 'Month']

for column in redundant_columns:
    df_spark_i94immigration = df_spark_i94immigration.drop(column)

In [34]:
df_spark_i94immigration.limit(5).toPandas()

Unnamed: 0,Id,Country Citizenship),Country Residence),Port Of Arrival,Date Of Arrival,Mode Of Travel,State Of Arrival,Date Of Departure,Age,Visa Category,Year Of Birth,Gender,Airline,Flight Number,Visa Type
0,6,ECU,ECU,XXX,2016-04-29,,,,37,Pleasure,1979,,,,B2
1,7,,,ATL,2016-04-07,Air,AL,,25,Student,1991,M,,296.0,F1
2,15,ALB,ALB,WAS,2016-04-01,Air,MI,2016-08-25,55,Pleasure,1961,M,OS,93.0,B2
3,16,ALB,ALB,NYC,2016-04-01,Air,MA,2016-04-23,28,Pleasure,1988,,AA,199.0,B2
4,17,ALB,ALB,NYC,2016-04-01,Air,MA,2016-04-23,4,Pleasure,2012,,AA,199.0,B2


#### 2.2.2: World Temperature Data

##### 2.2.2.1: Unsuitable types

In [35]:
# columns that should be IntegerType
integer_type_columns = [
    'AverageTemperature', 'AverageTemperatureUncertainty',
]
for column in df_spark_worldtemperature.columns:
    if column in integer_type_columns:
        df_spark_worldtemperature = df_spark_worldtemperature \
            .withColumn(column, df_spark_worldtemperature[column].cast(IntegerType()))

# columns that should be DateType
udf_date_from_worldtemperature = F.udf(lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType())
date_type_columns = [
    'dt',
]
for column in df_spark_worldtemperature.columns:
    if column in date_type_columns:
        df_spark_worldtemperature = df_spark_worldtemperature \
            .withColumn(column, udf_date_from_worldtemperature(df_spark_worldtemperature[column]))

##### 2.2.2.2: Non-standard formats

In [36]:
# transform Country column into the ISO-3166 Alpha-3 format
worldtemperature_column_names = df_spark_worldtemperature.columns

# manually transform countries whose names don't match up with the name in ISO-3166 standard
worldtemperature_country_rename_dict = {
    'Bahamas': 'Bahamas (the)',
    'Bolivia': 'Bolivia (Plurinational State of)',
    'Bosnia And Herzegovina': 'Bosnia and Herzegovina',
    'Burma': 'Myanmar',
    'Central African Republic': 'Central African Republic (the)',
    'Congo': 'Congo (the)',
    'Congo (Democratic Republic Of The)': 'Congo (the Democratic Republic of the)',
    'Czech Republic': 'Czechia',
    'Côte D\'Ivoire': 'Côte d\'Ivoire',
    'Dominican Republic': 'Dominican Republic (the)',
    'Gambia': 'Gambia (the)',
    'Guinea Bissau': 'Guinea-Bissau',
    'Iran': 'Iran (Islamic Republic of)',
    'Laos': 'Lao People\'s Democratic Republic (the)',
    'Macedonia': 'Republic of North Macedonia',
    'Moldova': 'Moldova (the Republic of)',
    'Netherlands': 'Netherlands (the)',
    'Niger': 'Niger (the)',
    'Philippines': 'Philippines (the)',
    'Reunion': 'Réunion',
    'Russia': 'Russian Federation (the)',
    'South Korea': 'Korea (the Republic of)',
    'Sudan': 'Sudan (the)',
    'Swaziland': 'Eswatini',
    'Syria': 'Syrian Arab Republic',
    'Taiwan': 'Taiwan (Province of China)',
    'Tanzania': 'Tanzania, United Republic of',
    'United Arab Emirates': 'United Arab Emirates (the)',
    'United Kingdom': 'United Kingdom of Great Britain and Northern Ireland (the)',
    'United States': 'United States of America (the)',
    'Venezuela': 'Venezuela (Bolivarian Republic of)',
    'Vietnam': 'Viet Nam',
}
df_spark_worldtemperature = df_spark_worldtemperature \
    .replace(worldtemperature_country_rename_dict, 1, 'Country')

df_spark_worldtemperature = df_spark_worldtemperature.join(
        df_spark_iso3166countries,
        ['Country'],
        how='left'
) \
    .withColumnRenamed('Country', 'Country Name') \
    .withColumnRenamed('Alpha-3 code', 'Country') \
    .select(*worldtemperature_column_names, 'Country Name')

##### 2.2.2.3: Unsemantic names

In [37]:
column_rename = [
    ('dt', 'Date'),
    ('AverageTemperature', 'Average Temperature'),
    ('AverageTemperatureUncertainty', 'Average Temperature Uncertainty'),
]

for k,v in column_rename:
    df_spark_worldtemperature = df_spark_worldtemperature.withColumnRenamed(k, v)

##### 2.2.2.4: Out-of-range data

In [39]:
## the data range of this data is 1743-2013, so we cannot match this data with the core immigration dataset by date
## therefore, an average over time (e.g., average over the 20 year period from 1993-2013) per city is suitable
df_spark_worldtemperature = df_spark_worldtemperature \
    .filter(F.col('Average Temperature').isNotNull()) \
    .filter(F.col('Date') >= F.lit('1993-01-01')) \
    .groupBy('City') \
    .agg(
        F.mean('Average Temperature').alias('Average Temperature'),
        F.first('Country').alias('Country'),
        F.first('Country Name').alias('Country Name'),
        F.first('Latitude').alias('Latitude'),
        F.first('Longitude').alias('Longitude'),
    )

In [40]:
df_spark_worldtemperature.limit(5).toPandas()

Unnamed: 0,City,Average Temperature,Country,Country Name,Latitude,Longitude
0,Antwerp,10.451613,BEL,Belgium,50.63N,3.80E
1,Araruama,24.189516,BRA,Brazil,23.31S,42.82W
2,Bangalore,25.080645,IND,India,12.05N,77.26E
3,Benxi,8.181452,CHN,China,40.99N,123.55E
4,Cajamarca,16.931452,PER,Peru,7.23S,78.65W


#### 2.2.3: U.S. City Demographic Data

##### 2.2.3.1: Unsuitable types

In [41]:
# columns that should be IntegerType
integer_type_columns = [
    'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Count',
]
for column in df_spark_uscitydemographic.columns:
    if column in integer_type_columns:
        df_spark_uscitydemographic = df_spark_uscitydemographic \
            .withColumn(column, df_spark_uscitydemographic[column].cast(IntegerType()))
        
# columns that should be FloatType
float_type_columns = [
    'Median Age', 'Average Household Size',
]
for column in df_spark_uscitydemographic.columns:
    if column in float_type_columns:
        df_spark_uscitydemographic = df_spark_uscitydemographic \
            .withColumn(column, df_spark_uscitydemographic[column].cast(FloatType()))

##### 2.2.3.2: Unsemantic names

In [42]:
column_rename = [
    ('Number of Veterans', 'Veteran Population'),
    ('Foreign-born', 'Foreign Born Population'),
    ('Count', 'Race Count')
]

for k,v in column_rename:
    df_spark_uscitydemographic = df_spark_uscitydemographic.withColumnRenamed(k, v)

##### 2.2.3.3: Redundant data

In [43]:
## every row for a given city has identical data in all columns except Race and Race Count,
## so the table can be pivoted to have only one row per city
column_names = df_spark_uscitydemographic.columns

races = list(df_spark_uscitydemographic.select('Race').dropDuplicates().sort(F.asc('Race')).toPandas()['Race'])

df_spark_uscitydemographic = df_spark_uscitydemographic
for race in races:
    df_spark_uscitydemographic = df_spark_uscitydemographic \
        .withColumn(race, F.when(F.col('Race') == race, F.col('Race Count')).otherwise(0))

df_spark_uscitydemographic = df_spark_uscitydemographic \
    .groupBy('City', 'State') \
    .agg(
        *map(lambda x: F.first(x).alias(x), filter(lambda y: y not in ['City', 'State', 'Race', 'Race Count'], column_names)),
        *map(lambda x: F.max(x).alias(x + ' Population'), races),
    )

In [44]:
df_spark_uscitydemographic.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Veteran Population,Foreign Born Population,Average Household Size,State Code,American Indian and Alaska Native Population,Asian Population,Black or African-American Population,Hispanic or Latino Population,White Population
0,Cincinnati,Ohio,32.700001,143654,154883,298537,13699,16896,2.08,OH,3362,7633,133430,9121,162245
1,Kansas City,Kansas,33.400002,74606,76655,151261,8139,25507,2.71,KS,2749,7301,40177,44342,96113
2,Lynchburg,Virginia,28.700001,38614,41198,79812,4322,4364,2.48,VA,1024,2910,23271,2689,53727
3,Auburn,Washington,37.099998,36837,39743,76580,5401,14842,2.73,WA,3042,12341,4032,10836,58293
4,Dayton,Ohio,32.799999,66631,73966,140597,8465,7381,2.26,OH,2010,1885,57280,4945,86016


#### 2.2.4 Normalize column names
Some formats (including parquet) don't allow certain characters in column names, so these are removed.

In [45]:
def normalize_column_names(df):
    return df.select([F.col(col).alias(re.sub('[^0-9a-zA-Z$]+','', col.title())) for col in df.columns])
  
df_spark_i94immigration = normalize_column_names(df_spark_i94immigration)
df_spark_worldtemperature = normalize_column_names(df_spark_worldtemperature)
df_spark_uscitydemographic = normalize_column_names(df_spark_uscitydemographic)

## 3. Define the Data Model
### 3.1 Conceptual Data Model
The data model is a star schema.

The fact table in the center of the star schema is the `immigration` table, in which each row represents a person immigrating into the US with a I-94 form. The dimension tables are `date`, `country` and `us_state`, with the foreign keys in the `immigration` table all using standardised formats.

A star schema was chosen as it is a simple, well-understood convention that captures all of the data within the scope of this project.

The below diagram was created on [https://dbdiagram.io](https://dbdiagram.io/d) - the source code can be viewed in `/images/er-diagram/source`.

![ER Diagram](images/er-diagram/image.png)

### 3.2 Mapping Out Data Pipelines
Having completed the "Cleaning" step on the 3 core DataFrames, the next stage of the data pipeline is to generate the fact table and three dimension tables of the star schema.

1. `immigration` fact table: this is generated from `df_spark_i94immigrations`, with a few columns renamed
2. `date` dimension table: this is generated on the fly from a data range of `2016-01-01` to `2016-12-31`
3. `us_state` dimension table: this is generated from `df_spark_uscitydemographic`, by rolling up such that there is one row per US state
4. `country` dimension table: this is generated from `df_spark_worldtemperature`, by rolling up such that there is one row per country

## 4: Run Pipelines to Model the Data 
### 4.1 Create the data model

#### 4.1.1 `immigration` fact table

In [46]:
df_spark_immigration_fact = df_spark_i94immigration \
    .withColumnRenamed('PortOfArrival', 'Port') \
    .withColumnRenamed('DateOfArrival', 'Date') \
    .withColumnRenamed('StateOfArrival', 'State')

In [47]:
df_spark_immigration_fact.limit(5).toPandas()

Unnamed: 0,Id,CountryCitizenship,CountryResidence,Port,Date,ModeOfTravel,State,DateOfDeparture,Age,VisaCategory,YearOfBirth,Gender,Airline,FlightNumber,VisaType
0,6,ECU,ECU,XXX,2016-04-29,,,,37,Pleasure,1979,,,,B2
1,7,,,ATL,2016-04-07,Air,AL,,25,Student,1991,M,,296.0,F1
2,15,ALB,ALB,WAS,2016-04-01,Air,MI,2016-08-25,55,Pleasure,1961,M,OS,93.0,B2
3,16,ALB,ALB,NYC,2016-04-01,Air,MA,2016-04-23,28,Pleasure,1988,,AA,199.0,B2
4,17,ALB,ALB,NYC,2016-04-01,Air,MA,2016-04-23,4,Pleasure,2012,,AA,199.0,B2


#### 4.1.2 `date` dimension table

In [48]:
df_spark_date_dim = spark.sql('SELECT sequence({start_date}, {end_date}, 60 * 60 * 24) as timestamp_seq'.format(
    start_date=int(datetime(2016, 1, 1).timestamp()),
    end_date=int(datetime(2016, 12, 31).timestamp())
)) \
    .withColumn('timestamp', F.explode('timestamp_seq')) \
    .withColumn('Date', F.to_date(F.col('timestamp').cast(TimestampType()))) \
    .select('Date') \
    .withColumn('Day', F.dayofmonth(F.col('Date'))) \
    .withColumn('Month', F.month(F.col('Date'))) \
    .withColumn('Year', F.year(F.col('Date'))) \
    .withColumn('DayOfWeek', F.dayofweek(F.col('Date'))) \
    .withColumn('WeekOfYear', F.weekofyear(F.col('Date')))

In [49]:
df_spark_date_dim.limit(5).toPandas()

Unnamed: 0,Date,Day,Month,Year,DayOfWeek,WeekOfYear
0,2016-01-01,1,1,2016,6,53
1,2016-01-02,2,1,2016,7,53
2,2016-01-03,3,1,2016,1,53
3,2016-01-04,4,1,2016,2,1
4,2016-01-05,5,1,2016,3,1


#### 4.1.3 `us_state` dimension table

In [50]:
# we can't roll 'Median Age' up from city level to state level, 
# but we can roll up 'Average Household Size' if we calculate 'Number of Households' as an intermediate step
df_spark_uscitydemographic = df_spark_uscitydemographic \
    .withColumn('NumberOfHouseholds', (F.col('TotalPopulation') / F.col('AverageHouseholdSize')).cast(IntegerType()))

df_spark_us_state_dim =  df_spark_uscitydemographic\
    .groupBy('StateCode') \
    .agg(
        *map(
            lambda x: F.first(x).alias(x),
            ['State'],
        ),
        *map(
            lambda x: F.sum(x).alias(x),
            filter(
                lambda y: y not in ['City', 'State', 'MedianAge', 'AverageHouseholdSize', 'StateCode'],
                df_spark_uscitydemographic.columns,
            ),
        ),
    ) \
    .withColumnRenamed('StateCode', 'Code') \
    .withColumnRenamed('State', 'Name') \
    .withColumn('AverageHouseholdSize', (F.col('TotalPopulation') / F.col('NumberOfHouseholds')).cast(FloatType())) \
    .sort(F.asc('Code'))

# ensure there is at least one row per state
df_spark_us_state_dim = df_spark_usstatesnames.select('Code').join(
    df_spark_us_state_dim,
    'Code',
    how='left'
) \
    .select(*df_spark_us_state_dim.columns)

In [51]:
df_spark_us_state_dim.limit(5).toPandas()

Unnamed: 0,Code,Name,MalePopulation,FemalePopulation,TotalPopulation,VeteranPopulation,ForeignBornPopulation,AmericanIndianAndAlaskaNativePopulation,AsianPopulation,BlackOrAfricanAmericanPopulation,HispanicOrLatinoPopulation,WhitePopulation,NumberOfHouseholds,AverageHouseholdSize
0,AL,Alabama,497248,552381,1049629,71543,52154,8084,28769,521068,39313,498920,443971,2.364184
1,AK,Alaska,152945,145750,298695,27492,33258,36339,36825,23107,27261,212696,107832,2.770003
2,AZ,Arizona,2227455,2272087,4499542,264505,682313,129708,229183,296222,1508157,3591611,1639715,2.7441
3,AR,Arkansas,286479,303400,589879,31704,62108,9381,22062,149608,77813,384733,238503,2.473256
4,CA,California,12278281,12544179,24822460,928270,7448257,401386,4543730,2047009,9856464,14905129,8330600,2.979672


#### 4.1.4 `country` dimension table

In [52]:
df_spark_country_dim = df_spark_worldtemperature \
    .groupBy('Country') \
    .agg(
        F.first('CountryName').alias('Name'),
        F.mean('AverageTemperature').alias('AverageTemperature')
    ) \
    .withColumnRenamed('Country', 'Code') \
    .sort(F.asc('Code'))

In [53]:
df_spark_country_dim.limit(5).toPandas()

Unnamed: 0,Code,Name,AverageTemperature
0,AFG,Afghanistan,14.537802
1,AGO,Angola,21.957661
2,ALB,Albania,15.875
3,ARE,United Arab Emirates (the),27.322581
4,ARG,Argentina,17.21662


#### 4.1.5 Persist model
Write the model as parquet files to local and S3:

In [54]:
parquet_output_path = config['PARQUET']['OUTPUT_PATH']
s3_output_path = 's3a://' + config['S3']['OUTPUT_PATH']

for path in [parquet_output_path]: #, s3_output_path]:
    df_spark_immigration_fact.write.mode('overwrite').parquet(path + '/immigration.parquet')
    df_spark_date_dim.write.mode('overwrite').parquet(path + '/date.parquet')
    df_spark_us_state_dim.write.mode('overwrite').parquet(path + '/us_state.parquet')
    df_spark_country_dim.write.mode('overwrite').parquet(path + '/country.parquet')

### 4.2 Data Quality Checks
#### 4.2.1 Assert that all tables have data

In [55]:
filenames = ['immigration', 'date', 'us_state', 'country']

for filename in filenames:
    df = spark.read.parquet('./models/' + filename + '.parquet')
    assert df.count() > 0

#### 4.2.2 Assert foreign key constraints
The example below asserts that the `State` column in the `immigration` fact table is a valid foreign key into the `us_state` dimension table.

In [59]:
fact_immigration = spark.read.parquet('./models/immigration.parquet')
dim_us_state = spark.read.parquet('./models/us_state.parquet')

fact_immigration_states = list(fact_immigration.select('State').dropDuplicates().filter(F.col('State').isNotNull()).toPandas()['State'])
dim_country_states = list(dim_us_state.select('Code').toPandas()['Code'])

for state in fact_immigration_states:
    assert state in dim_country_states

The rest of the foreign keys could be checked in a similar way.

### 4.3 Data dictionary 
`immigration` fact table:

| Column             | Type   | Description |
| ------------------ | ------ | ----------- |
| Id                 | int    | Id (PK) |
| Port               | string | Port of Entry |
| State              | string | State of Entry (as: 2 character code) (FK to `us_state` table) |
| Date               | date   | Date of Entry (FK to `date` table) |
| DateOfDeparture    | date   | Date of Departure (FK to `date` table) |
| CountryCitizenship | string | Country of Citizenship of Immigrant (as: ISO-3166 Alpha-3) (FK to `country` table) |
| CountryResidence   | string | Country of Residence of Immigrant (as: ISO-3166 Alpha-3) (FK to `country` table) |
| ModeOfTravel       | string | Mode of Travel (as: 1 = Air, 2 = Sea, 3 = Land, 9 = Not Reported) |
| Age                | int    | Age of Immigrant (as: years) |
| YearOfBirth        | int    | Year of Birth of Immigrant |
| Gender             | string | Gender of Immigrant (as: M, F) |
| Airline            | string | Airline of Travel (if applicable) |
| FlightNumber       | string | Flight Number (if applicable) |
| VisaCategory       | string | Visa Category |
| VisaType           | string | Visa Type |


`date` dimension table:

| Column             | Type   | Description |
| ------------------ | ------ | ----------- |
| Date       | date | Date (PK) |
| Day        | int  | Day (of Month) |
| Month      | int  | Month |
| Year       | int  | Year |
| DayOfWeek  | int  | Day of Week |
| WeekOfYear | int  | Week of Year |


`country` dimension table:

| Column             | Type   | Description |
| ------------------ | ------ | ----------- |
| Code               | string | Country Code (as: ISO-3166 Alpha-3) (PK) |
| Name               | string | Country Name |
| AverageTemperature | int    | Average Temperature of the Country between 1993 and 2013 |


`us_state` dimension table:

| Column                                  | Type   | Description |
| --------------------------------------- | ------ | ----------- |
| Code                                    | string | State Code (PK) |
| Name                                    | string | State Name |
| TotalPopulation                         | int    | Total Population of the State |
| MalePopulation                          | int    | Male Population of the State |
| FemalePopulation                        | int    | Female Population of the State |
| VeteranPopulation                       | int    | Veteran Population of the State |
| ForeignBornPopulation                   | int    | Foreign-born Population of the State |
| AmericanIndianAndAlaskaNativePopulation | int    | American Indian and Alaska Native Population of the State |
| AsianPopulation                         | int    | Asian Population of the State |
| BlackOrAfricanAmericanPopulation        | int    | Black or African-American Population of the State |
| HispanicOrLatinoPopulation              | int    | Hispanic or Latino Population of the State |
| WhitePopulation                         | int    | White Population of the State |
| AverageHouseholdSize                    | int    | Average Household Size of the State |

## 5: Further Questions
#### Clearly state the rationale for the choice of tools and technologies for the project.
- **[Apache Spark](https://spark.apache.org/)**: chosen as it is a high performance, simple to use, industry standard engine for data lake processing. It allowed performant code to be written with easy-to-read syntax, and comes with out-of-the-box integrations with Jupyter, which was perfect for a project such as this
- **[Parquet](https://parquet.apache.org/)**: the data model is stored as Parquet files, which is a standard format for Hadoop-based systems (such as Spark) since it provides high compression and high performance encoding for columnar storage
- **[AWS S3](https://aws.amazon.com/s3/)**: the data model is stored in S3 (as well as locally) to unlock the power of other cloud-based tools for data analysis, for example, being ETL-ed into a [Redshift](https://aws.amazon.com/redshift/) cluster using an [Airflow](https://airflow.apache.org/), as demostrated in the "5 - Data Pipelines with Airflow" project

#### Propose how often the data should be updated and why.
The data should be updated every month, as the core data set is published as a monthly file.

#### Write a description of how you would approach the problem differently under the following scenarios:
##### The data was increased by 100x
The Spark cluster on which the ETL runs would need to be scaled horizontally by adding more nodes. Furthermore, it may make sense to process the data in smaller timesliced batches.

##### The data populates a dashboard that must be updated on a daily basis by 7am every day
The pipeline could be coordinated by an Airflow instance running on the Spark cluster, scheduled such that it ran daily in the hour before 7am. 

##### The database needed to be accessed by 100+ people
Since the data is persisted in S3, it can be accessed by as many people as necessary. Access to the S3 data itself can be managed through [AWS IAM](https://aws.amazon.com/iam/) roles, while access to the data via a 3rd-party-tool tool (e.g., a [Tableau](https://www.tableau.com/) instance connected to the S3 bucket) can be controlled by the 3rd-party-tool's access controls.