# Udacity Data Engineering Nanodegree
### Data Engineering Capstone Project

#### Project Summary
This project brings together all the subjects learned in the Data Engineering Nanodegree.

The aim of the project is to be able to perform analysis on US immigration data. After ETL, analysts should be able to find, for example, the amount of immigrants per state or city, the age distribution of immigrants, if weather / temperature is a factor for immigrants destination, etc. Below is a further explanation of the used data sets in this project. The used sets should be able to give interesting insights on the questions around immigration in the US and will prove valuable for fuirther analysis.
In the end, fact and dimension tables are designed and results are written to parquet files on S3.

The project follows the follow steps, each of which is explained in more detail below:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up


### First, let's do some necessary setups like importing libraries 

In [1]:
"""
Imports
"""
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, DateType
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pyspark.sql.functions as F
import pandas as pd


### Then, setup AWS credentials to store results on S3

In [2]:
"""
Setup and read in AWS credentials
"""
config = configparser.ConfigParser()
config.read('aws.cfg')

# AWS
os.environ['AWS_ACCESS_KEY_ID'] = config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY'] = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

# Set S3 bucket to write parquet files to, e.g.
# output_data = "s3a://udacity-dend-capstone-[...]/"
output_data = "/model_output/"


### We'll be using Spark for this project
Apache Spark is a unified analytics engine for large-scale data processing. It can be used interactively from, for example, Python and SQL shells. In other words, Spark is a framework that can quickly process task on big data sets. It's perfect for the task at hand.

In [3]:
"""
Creates a Spark session
"""
spark = SparkSession \
    .builder \
    .appName("Data Engineering Capstone Project") \
    .config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Below is an explanation of the data sets being used for this project. As mentioned, we'll be using Spark through both Python and SQL to perform ETL. 
We'll first do an exploration of the data sets, perform cleaning on the data, and write the data to new tables for further analysis. Per step below, more details are given on what is done and why.

#### Description of data sets
There are three datasets being used:
- I94 Immigration Data: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. <a href="https://travel.trade.gov/research/reports/i94/historical/2016.html">This</a> is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- World Temperature Data: This dataset came from Kaggle. You can read more about it <a href="https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data">here</a>.
- U.S. City Demographic Data: This data comes from OpenSoft. You can read more about it <a href="https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/">here</a>.

#### Read in the data

#### Immigration Data

In [4]:
# Read in immigration data
# immigration_data_folder = '../../data/18-83510-I94-Data-2016/*.sas7bdat'
immigration_data_path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
immigration_df = spark.read.format('com.github.saurfang.sas.spark').load(immigration_data_path)

In [5]:
# Have a first look at the data
immigration_df.limit(5).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


#### Demographics Data

In [6]:
# Read in demographics data
demographic_data_path = 'us-cities-demographics.csv'
demographic_df = spark.read.csv(demographic_data_path, header = True, sep = ';')

In [7]:
# Have a first look at the data
demographic_df.limit(5).toPandas()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


#### Temperature Data

In [8]:
# Read in demographics data
temperature_data_path = '../../data2/GlobalLandTemperaturesByCity.csv'
temperature_df = spark.read.format("csv").option("delimiter", ",").option("header", "true").load(temperature_data_path)

In [9]:
# Have a first look at the data
temperature_df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


### Step 2: Explore and Assess the Data
#### Explore and Clean the Data 
At first glance, we can see that some data sets have (a lot of) missing values. This will hinder our analysis, so we will have to clean up those records. There is also the possibility of duplicate data, which also needs to be addressed. Finally, there are time and / or date records, stored in `string` format, which needs to be cast to either a single value (e.g. `month`), or to a timestamp.

Per data set, below the necessary steps to clean the data have been documented.

### Immigration Data

In [10]:
# Count rows in immigration data set
immigration_count = immigration_df.count()
immigration_df.count()

3096313

In [11]:
# Print schema
immigration_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [12]:
# Let's fetch some data. List 10 airports
list_10 = immigration_df.select('i94port').distinct()
list_10.show(10)

+-------+
|i94port|
+-------+
|    FMY|
|    BGM|
|    HEL|
|    DNS|
|    MOR|
|    FOK|
|    HVR|
|    SNA|
|    PTK|
|    SPM|
+-------+
only showing top 10 rows



In [13]:
# Drop duplicates
immigration_df.dropDuplicates()

# Check for missing values
# This function takes one argument, a `DataFrame`
# It then counts all `NaN` and `Null` records, and returns a list in descending order
# By making a function of this, we can reuse this for other data sets
def count_missing_values(dataframe):
    df = dataframe.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type) in dataframe.dtypes if c_type not in ('timestamp', 'date')]).toPandas()
    return df.rename(index = {0: 'count'}).T.sort_values("count", ascending = False)

# Count by calling the function above
counted_missing = count_missing_values(immigration_df)
count_missing_values(immigration_df)

Unnamed: 0,count
entdepu,3095921
occup,3088187
insnum,2982605
visapost,1881250
gender,414269
i94addr,152592
depdate,142457
matflag,138429
entdepd,138429
airline,83627


We see that some columns contain a lot of missing values (over 95%). This is too much, so we will be removing these records.

In [14]:
# Drop columns with too many nulls        
cleaned_immigration_df = immigration_df.drop('entdepu', 'occup', 'insnum')
count_missing_values(cleaned_immigration_df)

Unnamed: 0,count
visapost,1881250
gender,414269
i94addr,152592
depdate,142457
entdepd,138429
matflag,138429
airline,83627
fltno,19549
i94bir,802
biryear,802


In [15]:
# Drop rows with nulls (`dropna()` defaults to axis=0, e.g. the rows)...
cleaned_immigration_df = cleaned_immigration_df.dropna()

# ... and do a final check for missing values. This should return all 0's now
count_missing_values(cleaned_immigration_df)

Unnamed: 0,count
cicid,0
dtadfile,0
fltno,0
admnum,0
airline,0
gender,0
dtaddto,0
biryear,0
matflag,0
entdepd,0


In [16]:
# It looks like we've cleaned out the missing values. Let's see the result
# Count cleaned-up dataframe
print("Original records: {}. Records after cleaning: {}. (Percentage dropped: {})".format(immigration_df.count(), cleaned_immigration_df.count(), 100 - ((cleaned_immigration_df.count() / immigration_df.count()) * 100)))

Original records: 3096313. Records after cleaning: 1058629. (Percentage dropped: 65.8100133933488)


In [17]:
# Create timestamp column from original "arrdate" column
# This function takes one argument and casts it to a time format
get_timestamp = udf(
    lambda x: datetime.fromtimestamp(x / 1000), TimestampType()
)

# Create staging table (including new column for arr_month)
cleaned_immigration_df = cleaned_immigration_df \
    .withColumn("arr_date", get_timestamp(cleaned_immigration_df["arrdate"])) \
    .withColumn("arr_month", month("arr_date"))

staging_immigration = cleaned_immigration_df.select(
                                "arr_date",
                                "arr_month",
                                col("biryear").cast("int").alias("birth_year"),
                                col("gender").alias("gender"),
                                col("visatype").alias("visa_type"),
                                col("i94addr").alias("state")).distinct()

staging_immigration.createOrReplaceTempView("staging_immigration")

# To write table to parquet file, uncomment the line below
# staging_immigration.write.partitionBy('state', 'arr_date', 'arr_month').parquet(output_data + "immigration/staging_immigration.parquet", "overwrite")

In [18]:
# Print the cleaned schema
staging_immigration.printSchema()

root
 |-- arr_date: timestamp (nullable = true)
 |-- arr_month: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- state: string (nullable = true)



In [19]:
# And have a quick look at the cleaned data
staging_immigration.limit(3).toPandas()

Unnamed: 0,arr_date,arr_month,birth_year,gender,visa_type,state
0,1970-01-01 00:00:20.545,1,1980,F,B2,PA
1,1970-01-01 00:00:20.545,1,1993,M,B2,AZ
2,1970-01-01 00:00:20.545,1,1979,M,B1,CA


### Demographics data

In [20]:
# Count rows in immigration data set
demographic_df.count()

2891

In [21]:
# Print schema
demographic_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)



In [22]:
# Check for missing values, by calling the same function as before
count_missing_values(demographic_df)

# Alternatively, we could also do something like:
# demographic_df.toPandas().isnull().sum().sort_values(ascending = False)

Unnamed: 0,count
Average Household Size,16
Number of Veterans,13
Foreign-born,13
Male Population,3
Female Population,3
City,0
State,0
Median Age,0
Total Population,0
State Code,0


In [23]:
# Drop rows with missing values
# Seeing the amount is very low, we will not be dropping complete columns here
demographic_df.dropDuplicates()
cleaned_demographic_df = demographic_df.dropna()

In [24]:
# Do a new count of the data set after cleaning
print("Original records: {}. Records after cleaning: {}. (Percentage dropped: {})".format(demographic_df.count(), cleaned_demographic_df.count(), 100 - ((cleaned_demographic_df.count() / demographic_df.count()) * 100)))

Original records: 2891. Records after cleaning: 2875. (Percentage dropped: 0.5534417156693223)


In [25]:
# Create staging table and appropriately name columns
staging_demographic = cleaned_demographic_df.select(
                                col("City").alias("city_name"),
                                col("Median Age").cast("float").alias("median_age"),
                                col("Male Population").cast("int").alias("male_population"),
                                col("Female Population").cast("int").alias("female_population"),
                                col("Total Population").cast("int").alias("total_population"),
                                col("Number of Veterans").cast("int").alias("num_of_veterans"),
                                col("Foreign-born").alias("foreign_born"),
                                col("Average Household Size").cast("float").alias("avg_household_size"),
                                col("State Code").alias("state_code"),
                                col("State").alias("state"),
                                col("Race").alias("race")).distinct()
    
staging_demographic.createOrReplaceTempView("staging_demographic")

# Write table to parquet file
# staging_demographic.write.partitionBy('state', 'city_name').parquet(output_data + "demographics/staging_demographic.parquet", "overwrite")

In [26]:
# Print the cleaned up schema
staging_demographic.printSchema()

root
 |-- city_name: string (nullable = true)
 |-- median_age: float (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- num_of_veterans: integer (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- avg_household_size: float (nullable = true)
 |-- state_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- race: string (nullable = true)



In [27]:
# And have a quick look at the cleaned data
staging_demographic.limit(3).toPandas()

Unnamed: 0,city_name,median_age,male_population,female_population,total_population,num_of_veterans,foreign_born,avg_household_size,state_code,state,race
0,Seattle,35.5,345659,338784,684443,29364,119840,2.13,WA,Washington,Hispanic or Latino
1,Boca Raton,47.299999,44760,48466,93226,4367,21117,2.22,FL,Florida,Asian
2,Toledo,36.099998,135455,144323,279778,15286,9257,2.29,OH,Ohio,Hispanic or Latino


### Temperature data

In [28]:
# Count rows in immigration data set
temperature_df.count()

8599212

In [29]:
# Print schema
temperature_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: string (nullable = true)
 |-- AverageTemperatureUncertainty: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [30]:
# Check for missing values
count_missing_values(temperature_df)

Unnamed: 0,count
AverageTemperature,364130
AverageTemperatureUncertainty,364130
dt,0
City,0
Country,0
Latitude,0
Longitude,0


In [31]:
# The amount of missing values is very low, so we can safely drop these
temperature_df.drop_duplicates()
cleaned_temperature_df = temperature_df.dropna()

In [32]:
# Do a new count of the data set after cleaning
print("Original records: {}. Records after cleaning: {}. (Percentage dropped: {})".format(temperature_df.count(), cleaned_temperature_df.count(), 100 - ((cleaned_temperature_df.count() / temperature_df.count()) * 100)))

Original records: 8599212. Records after cleaning: 8235082. (Percentage dropped: 4.234457761943773)


In [33]:
# We can even further filter the data, since we will only be looking at `Country = United States`
cleaned_temperature_df = cleaned_temperature_df.filter(cleaned_temperature_df['Country'] == 'United States')
cleaned_temperature_df.count()

661524

In [34]:
# Clean temperature data
# First, create new columns for `year` and `month`
cleaned_temperature_df = cleaned_temperature_df \
    .withColumn("year", year(cleaned_temperature_df['dt']).cast("int")) \
    .withColumn("month", month(cleaned_temperature_df["dt"]).cast("int")) \
    .dropna()

# Check date range of temperature records
# Min and Max years
print("Min year in data set:")
cleaned_temperature_df.select(cleaned_temperature_df['year']).distinct().orderBy(cleaned_temperature_df['year'], ascending=True).show(1)

print("Max year in data set:")
cleaned_temperature_df.select(cleaned_temperature_df['year']).distinct().orderBy(cleaned_temperature_df['year'], ascending=False).show(1)

Min year in data set:
+----+
|year|
+----+
|1743|
+----+
only showing top 1 row

Max year in data set:
+----+
|year|
+----+
|2013|
+----+
only showing top 1 row



We have to take note of the fact that the max year of the data set is 2013. This can be of influence on the final data / analysis, depending on the desired (or expected) outcomes. It's important to keep this in mind. For example, one can choose to AVG() over the 10 most recent years, per month.

In [35]:
# For now, we will only use the most recent year of data, which is 2013
cleaned_temperature_df = cleaned_temperature_df.filter(cleaned_temperature_df['year'] == '2013')

# Staging table, where we cast `avg_temperature` to a float, instead of string
staging_temperature = cleaned_temperature_df.select(
                                col("year"),
                                col("month"),
                                col("AverageTemperature").cast("float").alias("avg_temperature"),
                                col("City").alias("city"),
                                col("Country").alias("country")).drop_duplicates()

staging_temperature.createOrReplaceTempView("staging_temperature")

# Optionally, we could write this table to parquet file
# staging_weather.write.partitionBy('city').parquet(output_data + "weather/staging_weather", "overwrite")

In [36]:
# Print the cleaned schema
staging_temperature.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)



In [37]:
# And have a quick look at the cleaned data
staging_temperature.limit(3).toPandas()

Unnamed: 0,year,month,avg_temperature,city,country
0,2013,1,4.14,Amarillo,United States
1,2013,2,10.199,Carrollton,United States
2,2013,1,-2.751,Detroit,United States


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Below is a `star schema` with fact and dimension tables. A star schema is chosen because it is efficient in handling basic queries. A star schema is characterised by having a single fact table in the middle, surrounded by multiplpe dimension tables. This setup allows for easy queries, for exmaple by joining the tables.

The schema below shows how the various tables can be joined. For example, the `immigrants` and `temperature` tables can be joined on `state` and `city`, allowing for analysis on if and how weather / temperature plays a role in migration to the US.

<img src="star_schema.jpeg" alt="STAR Schema">

#### 3.2 Mapping Out Data Pipelines
The following steps are necessary to pipeline the data into the above data model

- Clean the data (drop missing values, cast data types, etc) - See above
- For each of the tables in the above model, the following steps have to be performed:
  - Extract columns from the staging tables created above
  - Rename columns
  - Create new tables
  - Write parquet files
  - Perform data quality checks

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Using `spark.sql`, we will create the tables

In [38]:
# Extract columns from staging tables to create fact and dimension tables
# Cities table
cities_table = spark.sql("""
    SELECT monotonically_increasing_id() as id, 
            sd.state_code as state,
            sd.city_name as city
    FROM staging_demographic sd
""")

cities_table.createOrReplaceTempView("cities_table")

# Write cities table to parquet files partitioned by state and city
cities_table.write.partitionBy('state', 'city').parquet(output_data + "tables/cities_table.parquet", "overwrite")

In [39]:
# Immigrants table
immigrants_table = spark.sql("""
    SELECT monotonically_increasing_id() as id, 
            si.arr_date as arr_date,
            si.arr_month as arr_month,
            si.birth_year as birth_year,
            si.gender as gender,
            si.visa_type as visa_type,
            si.state as state
    FROM staging_immigration si
""")

immigrants_table.createOrReplaceTempView("immigrants_table")

# Write cities table to parquet files partitioned by city and month
immigrants_table.write.partitionBy('state', 'arr_month').parquet(output_data + "tables/immigrants_table.parquet", "overwrite")

In [50]:
# Temperature table
# Note that for testing purposes only, an extra `WHERE` clause can be added to filter data on e.g. a single `city`
temperature_table = spark.sql("""
    SELECT monotonically_increasing_id() as id, 
            st.city as city,
            st.avg_temperature as avg_temperature,
            st.country as country,
            st.year as year,
            st.month as month
    FROM staging_temperature st
""")

temperature_table.createOrReplaceTempView("temperature_table")

# Write cities table to parquet files partitioned by city and month
temperature_table.write.partitionBy('city', 'month').parquet(output_data + "tables/temperature_table.parquet", "overwrite")

In [44]:
# City_info table
city_info_table = spark.sql("""
    SELECT monotonically_increasing_id() as id, 
            sd.city_name as city,
            sd.median_age as median_age,
            sd.male_population as male_population,
            sd.female_population as female_population,
            sd.total_population as total_population,
            sd.num_of_veterans as num_of_veterans,
            sd.foreign_born as foreign_born,
            sd.avg_household_size as avg_household_size,
            sd.state_code as state,
            sd.race as race
    FROM staging_demographic sd
""")

city_info_table.createOrReplaceTempView("city_info_table")

# Write cities table to parquet files partitioned by state_code and city
city_info_table.write.partitionBy('state', 'city').parquet(output_data + "tables/city_info_table.parquet", "overwrite")

#### 4.2 Data Quality Checks
To ensure the pipeline ran as expected, we will have to perform some quality checks. We will check if the tables exist, and if the contain any values.
 
We will be running two Quality Checks in the loop below:

In [55]:
# Create array of tables to loop over and see if they exist
array_of_tables = [cities_table, immigrants_table, temperature_table, city_info_table]
cities_table.name = "cities_table"
immigrants_table.name = "immigrants_table"
temperature_table.name = "temperature_table"
city_info_table.name = "city_info_table"

# Loop over tables
for table in array_of_tables:
    if table is not None:
        count = table.count()
        if count != 0:
            print("Quality check passed! Table '{}' has been created. Containing {} records.".format(table.name, count))
        else:
            print("Error found! Table '{}' has been created, but is empty..".format(table))
    else:
        print("Error! Table {} not found!".format(table))

Quality check passed! Table 'cities_table' has been created. Containing 2875 records.
Quality check passed! Table 'immigrants_table' has been created. Containing 199050 records.
Quality check passed! Table 'temperature_table' has been created. Containing 9 records.
Quality check passed! Table 'city_info_table' has been created. Containing 2875 records.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

| cities        |            | 
| ------------- |---------------| 
| id      | Unique identifier | 
| city      | Name of the US city       | 
| state     | Two letter name of the US state      |


| immigrants        |            | 
| ------------- |---------------| 
| id      | Unique identifier | 
| arr_date      | Immigrant arrival date in the US       | 
| arr_month     | Arrival month, based on date      |
| birth_year | Immigrant year of birth |
| gender | Immigrants' gender |
| visa_type | Visa type used to enter the US |
| state | State of immigration |


| temperature        |            | 
| ------------- |---------------| 
| id      | Unique identifier | 
| city | Name of the city |
| avg_temperature | Average temperature measured per month |
| country | Name of the country |
| date | Date |
| month | Month of the year (integer, from date) |
| state | State |


| city_info        |            | 
| ------------- |---------------| 
| id      | Unique identifier | 
| city | Name of the city |
| median_age | Median age of inhabitants of specific city |
| male_population | Amount of male inhabitants |
| female_population | Amount of female inhabitants |
| total_population | Amount of inhabitants |
| num_of_veterans | Amount of veterans in city |
| foreign_born | Amount of inhabitants not born in city |
| avg_household_size | Average household size in city |
| state | State |
| race | Race of respondent|

#### Step 5: Complete Project Write Up

#### Rationale for the choice of tools and technologies for this project.
Apache Spark is a unified analytics engine for large-scale data processing. It can be used interactively from, for example, Python and SQL shells. In other words, Spark is a framework that can quickly process task on big data sets. Spark works very well with different file formats, as we have encountered here. It is fast and reliable, and allows for easy adding of additional recources if needed, for example when the work load increases in the future. Finally, Spark integrates well with AWS, which is used to store the output files (in S3).

All data is parionend by `state` for easier analysis. This allows for analysis on specific states, comparing them if wanted, and seeing trends over time per state, city and between them.

#### How often should the data be updated and why?

Seeing that we our smallest unit of measure in this project is per month, we won;t need to update the data every hour. However, it is not always advised to only update once a month, beceause this can lead to significant loading times. Of course, this is highly dependent on the availability of the data. We should aim at updating at least once a month, preferably twice a month, if new data is available. If no new data is available, there is no need to update.

#### Description of how to approach the problem differently under the following scenarios:
 * _The data was increased by 100x._ \
   This would potentially increase loading times, however Spark is perfectly able to handle this. If a lot of new data is added however, this may increase the need for making further decisions on which data is needed, and which data can be omitted. However, strictly speaking Spark can handle an increase by adding more worker nodes to the cluster.

   
 * _The data populates a dashboard that must be updated on a daily basis by 7am every day._ \
   We will have to make sure that all steps in the proces run as expected. An obvous candidate for this is using Apache Airflow, which allows us to specify a DAG that controls the tasks that need to run. We will get notified when a task fails. 
 
 
 
 
 * _The database needed to be accessed by 100+ people._ \
   Running the project locally is no option in this case. Since we are already connected to AWS, Redshift might seem an obvious choice to host the project.