In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars", "s3://gsc-udc-poc/spark-sas7bdat-3.0.0-s_2.12.jar").\
config("spark.jars.packages", "https://mvnrepository.com/artifact/com.epam/parso/2.0.12,org.apache.hadoop:hadoop-aws:2.7.0").\
enableHiveSupport().getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
32,application_1634329060496_0090,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
import datetime
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, IntegerType, StringType, LongType
from pyspark.sql.functions import udf, year , month, dayofweek, substring, date_format, col, when, count, avg, lower


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Step 1: Scope the Project and Gather Data

Scope
The goal of this project is pull data from 6 different sources and create fact and dimension table to be able to do analysis on US immigration using factors of city monthly average temperature, city demographics.

Describe and Gather Data
I94 Immigration Data: comes from the U.S. National Tourism and Trade Office and contains various statistics on international visitor arrival in USA and comes from the US National Tourism and Trade Office. The dataset contains data from 2016. link
World Temperature Data: comes from Kaggle and contains average weather temperatures by city. link
U.S. City Demographic Data: comes from OpenSoft and contains information about the demographics of all US cities such as average age, male and female population.

Aditionally we will use States, Countries and Cities to match with records from the above sources.

In [3]:
# Read Demographics Data
us = spark.read.format("csv").option("header", "true").option("delimiter", ";").load("s3://gsc-udc-poc/us-cities-demographics.csv")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Read States
i94_addr = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("s3://gsc-udc-poc/I94ADDR.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Read Inmigration Data
inmigration = spark.read.parquet("s3://gsc-udc-poc/sas_data/")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Read Cities
cities = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("s3://gsc-udc-poc/I94PORT.csv")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Read Countries
countries = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("s3://gsc-udc-poc/I94CIT_I94RES.csv")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Read Temperatures by City
temperature = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("s3://gsc-udc-poc/GlobalLandTemperaturesByCity.csv")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 2: Explore and Assess the Data

To familiarize ourselves with the data provided by Udacity we have done an exhaustive exploratory data analysis (EDA) checking what data would be useful and what preprocessing steps we should take in order to clean, organize and join the various datasets in a meaningful data model.

In the following sections we briefly describe the datasets provided and give a summarized idea on the reasons we took into consideration when deciding what data to use.



In [9]:
sc.install_pypi_package("pandas==0.25.1")
import pandas as pd

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Using cached https://files.pythonhosted.org/packages/7e/ab/ea76361f9d3e732e114adcd801d2820d5319c23d0ac5482fa3b412db217e/pandas-0.25.1-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1 (from pandas==0.25.1)
  Using cached https://files.pythonhosted.org/packages/36/7a/87837f39d0296e723bb9b62bbb257d0355c7f6128853c78955f57342a56d/python_dateutil-2.8.2-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.2

### Demographic Data

This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.

In [10]:
# CLEANING DEMOGRAPHIC DATA SET SUMMARY NOTED BY STEPS:

# 'Race' and 'Count' records are causing duplicate rows. We will separate them from US demographics data set and include 'City' and 'State Code'.
# 'Race' will be pivoted to column headers and saved to us_race_cnt dataset
# US dataset will be cleaned of duplicate rows (shown above).
# Cleaned US dataset will be joined back with us_race_cnt dataset to eventually have unique rows.
# replace null by 0 for gender columns
# create list of valid cities
# match only cities present in valid cities (I94PORT)

# Check us dataset for duplicate rows and which columns cause the duplicates
us.select("City", "State Code", "Median Age","Male Population","Female Population","Total Population", \
                  "Foreign-born","Average Household Size").orderBy("City").show(10, truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+----------+---------------+-----------------+----------------+------------+----------------------+
|City   |State Code|Median Age|Male Population|Female Population|Total Population|Foreign-born|Average Household Size|
+-------+----------+----------+---------------+-----------------+----------------+------------+----------------------+
|Abilene|TX        |31.3      |65212          |60664            |125876          |8129        |2.64                  |
|Abilene|TX        |31.3      |65212          |60664            |125876          |8129        |2.64                  |
|Abilene|TX        |31.3      |65212          |60664            |125876          |8129        |2.64                  |
|Abilene|TX        |31.3      |65212          |60664            |125876          |8129        |2.64                  |
|Abilene|TX        |31.3      |65212          |60664            |125876          |8129        |2.64                  |
|Akron  |OH        |38.1      |96886          |1

In [11]:
us_race_cnt=(us.select("City","State Code","Race","count")
    .groupby("City", "State Code")
    .pivot("Race")
    .agg({'City':'count'})).fillna({"American Indian and Alaska Native": 0,
                     "Asian": 0,
                     "Black or African-American": 0,
                     "Hispanic or Latino": 0,
                     "White": 0})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Comparing both datasets after dropping duplicate rows
(us_race_cnt.count(), us_race_cnt.dropDuplicates().count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(596, 596)

In [12]:
# Drop columns we don't need and drop duplicate rows
uscols=["Number of Veterans","Race","Count"]
us=us.drop(*uscols).dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Finally saving (committing) joined US dataset
us=us.join(us_race_cnt, ["City","State Code"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# Change `state code` column name to `state_code` and other similar problems to avoid parquet issues
us=us.select(col('City').alias('city'), col('State Code').alias('state_code'), col('Median Age').alias('median_age'),
                     col('Male Population').alias('male_population'), col('Female Population').alias('female_population'), 
                     col('Total Population').alias('total_population'), col('Foreign-born').alias('foreign_born'), 
                     col('Average Household Size').alias('avg_household_size'),
                     col('American Indian and Alaska Native').alias('native_population'), 
                     col('Asian').alias('asian_population'), 
                     col('Black or African-American').alias('black_population'), 
                     col('Hispanic or Latino').alias('latino_population'), 
                     col('White').alias('white_population'))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Drop the `state` column since state comes from Dim State
us=us.drop("State")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# Create list of valid cities
cities = cities.select(col('Code').alias('city_code'), col('City').alias('city'), col('State').alias('state_code'))
cities = cities.dropDuplicates()

valid_cities = {}
for index, row in cities.toPandas().iterrows():
    valid_cities[row[0]] = row[1]

@udf(StringType())
def city_to_port(city):
    for key in valid_cities:
        if city.lower() in valid_cities[key].lower():
            return key


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Convert City to Port and remove null values
us = us.withColumn("city_code", city_to_port(us["city"])).dropna(subset=['city_code'])
us.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----------+----------+---------------+-----------------+----------------+------------+------------------+-----------------+----------------+----------------+-----------------+----------------+---------+
|           city|state_code|median_age|male_population|female_population|total_population|foreign_born|avg_household_size|native_population|asian_population|black_population|latino_population|white_population|city_code|
+---------------+----------+----------+---------------+-----------------+----------------+------------+------------------+-----------------+----------------+----------------+-----------------+----------------+---------+
|           Kent|        WA|      33.4|          61825|            65137|          126962|       38175|              3.06|                1|               1|               1|                1|               1|      FTK|
|   Fayetteville|        AR|      27.1|          41959|            40873|           82832|        6313|              2.2

### Immigration Data

For decades, U.S. immigration officers issued the I-94 Form (Arrival/Departure Record) to foreign visitors (e.g., business visitors, tourists and foreign students) who lawfully entered the United States. The I-94 was a small white paper form that a foreign visitor received from cabin crews on arrival flights and from U.S. Customs and Border Protection at the time of entry into the United States. It listed the traveler's immigration category, port of entry, data of entry into the United States, status expiration date and had a unique 11-digit identifying number assigned to it. Its purpose was to record the traveler's lawful admission to the United States.



In [18]:
# CLEANING INMIGRATION DATA SET SUMMARY NOTED BY STEPS:

# Create a list of valid_states
# Convert sas date to date
# Convert i94mode to a description
# US dataset will be cleaned of duplicate rows
# Convert Visa mode to a description
# Cast types for some columns
# Drop unused columns we don't need 

# create new list with Valid States
valid_states = i94_addr.select(["Code"]).toPandas()["Code"].unique()
print(valid_states)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['AL' 'AK' 'AZ' 'AR' 'CA' 'CO' 'CT' 'DE' 'DC' 'FL' 'GA' 'GU' 'HI' 'ID'
 'IL' 'IN' 'IA' 'KS' 'KY' 'LA' 'ME' 'MD' 'MA' 'MI' 'MN' 'MS' 'MO' 'MT'
 'NC' 'ND' 'NE' 'NV' 'NH' 'NJ' 'NM' 'NY' 'OH' 'OK' 'OR' 'PA' 'PR' 'RI'
 'SC' 'SD' 'TN' 'TX' 'UT' 'VT' 'VI' 'VA' 'WV' 'WA' 'WI' 'WY' '99']

In [19]:
def convert_sas_date(days):
    """
    Converts SAS date stored as days since 1/1/1960 to datetime
    :param days: Days since 1/1/1960
    :return: datetime
    """
    if days is None:
        return None
    return datetime.date(1960, 1, 1) + datetime.timedelta(days=days)

def convert_i94mode(mode):
    """
    Converts i94 travel mode code to a description
    :param mode: int i94 mode as integer
    :return: i94 mode description
    """
    if mode == 1:
        return "Air"
    elif mode == 2:
        return "Sea"
    elif mode == 3:
        return "Land"
    else:
        return "Not Reported"

def convert_visa(visa):
    """
    Converts visa numeric code to description
    :param visa: str
    :return: Visa description: str
    """
    if visa is None:
        return "Not Reported"
    elif visa == 1:
        return "Business"
    elif visa == 2:
        return "Pleasure"
    elif visa == 3:
        return "Student"
    else:
        return "Not Reported"


def validate_state(state):
    """
    Validate state
    :param state: str
    :return: state description: str
    """
    if state in valid_states:
        return state
    return 'other'


convert_sas_date_udf = F.udf(convert_sas_date, DateType())
convert_i94mode_udf = F.udf(convert_i94mode, StringType())
convert_visa_udf = F.udf(convert_visa, StringType())
validate_state_udf = F.udf(validate_state, StringType())

inmigration = inmigration.withColumn('age', inmigration['i94bir'].cast(IntegerType())) \
    .withColumn('i94cit', inmigration['i94cit'].cast(IntegerType())) \
    .withColumn('i94res', inmigration['i94res'].cast(IntegerType())) \
    .withColumn('arrival_date', convert_sas_date_udf(inmigration['arrdate']).cast(DateType())) \
    .withColumn('state_code', validate_state_udf(inmigration['i94addr']).cast(StringType())) \
    .withColumn('city_code', inmigration['i94port'].cast(StringType())) \
    .withColumn('biryear', inmigration['biryear'].cast(IntegerType())) \
    .withColumn('visa_mode', convert_i94mode_udf(inmigration['i94mode'])) \
    .withColumn('admnum', (inmigration['admnum'].cast(LongType()))) \
    .withColumn('visa_category', convert_visa_udf(inmigration['i94visa']))



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Drop columns we don't need and drop duplicate rows
inmigrationcols=["i94yr","i94mon","i94port","i94mode","i94addr","i94visa","arrival_date","count"]
inm_fact=inmigration.drop(*inmigrationcols).dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Temperature by City


In [21]:
# CLEANING TEMPERATURES BY CITY DATA SET SUMMARY NOTED BY STEPS:

# Rename Columns
# Drop Duplicates
# Filter by US
# Rank based on City Partition

temperature = temperature.filter(temperature.AverageTemperature.isNotNull())
temperature = temperature.filter(temperature.Country == "United States") \
                 .withColumn("year", year(temperature['dt'])) \
                 .withColumn("month", month(temperature["dt"])) \
                 .withColumn("temperature", temperature["AverageTemperature"].cast("float")) \
                 .withColumn("city_code", city_to_port(temperature["City"])) \
                 .withColumn("rank", F.dense_rank().over(Window.partitionBy("city_code").orderBy(F.desc("dt")))) \
                 .dropna(how='any', subset=["city_code"])


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
temperature_dim = temperature.orderBy("city_code").dropDuplicates().drop('AverageTemperatureUncertainty', 'AverageTemperature', 'City')
temperature_dim.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------+--------+---------+----+-----+-----------+---------+----+
|        dt|      Country|Latitude|Longitude|year|month|temperature|city_code|rank|
+----------+-------------+--------+---------+----+-----+-----------+---------+----+
|2013-09-01|United States|  34.56N|  107.03W|2013|    9|     19.207|      ABQ|   1|
|2013-08-01|United States|  34.56N|  107.03W|2013|    8|      22.04|      ABQ|   2|
|2013-07-01|United States|  34.56N|  107.03W|2013|    7|     22.951|      ABQ|   3|
|2013-06-01|United States|  34.56N|  107.03W|2013|    6|     23.418|      ABQ|   4|
|2013-05-01|United States|  34.56N|  107.03W|2013|    5|      16.29|      ABQ|   5|
|2013-04-01|United States|  34.56N|  107.03W|2013|    4|     11.555|      ABQ|   6|
|2013-03-01|United States|  34.56N|  107.03W|2013|    3|      7.976|      ABQ|   7|
|2013-02-01|United States|  34.56N|  107.03W|2013|    2|      1.221|      ABQ|   8|
|2013-01-01|United States|  34.56N|  107.03W|2013|    1|     -1.981|      AB

In [23]:
# CLEANING COUNTRY DATA SET SUMMARY NOTED BY STEPS:

# Clean Countries with INVALID VALUES to VALID values
# Drop Duplicates

countries_dim = countries.withColumn("I94CTRY", when(col("I94CTRY") == "INVALID: AMERICAN SAMOA", "AMERICAN SAMOA").otherwise(col("I94CTRY"))) \
                 .withColumn("I94CTRY", when(col("I94CTRY") == "INVALID: ANTARCTICA", "ANTARCTICA").otherwise(col("I94CTRY"))) \
                 .withColumn("I94CTRY", when(col("I94CTRY") == "INVALID: CANADA", "CANADA").otherwise(col("I94CTRY"))) \
                 .withColumn("I94CTRY", when(col("I94CTRY") == "INVALID: FRENCH SOUTHERN AND ANTARCTIC", "FRENCH SOUTHERN AND ANTARCTIC").otherwise(col("I94CTRY"))) \
                 .withColumn("I94CTRY", when(col("I94CTRY") == "INVALID: GUAM", "GUAM").otherwise(col("I94CTRY"))) \
                 .withColumn("I94CTRY", when(col("I94CTRY") == "INVALID: GREENLAND", "GREENLAND").otherwise(col("I94CTRY"))) \
                 .withColumn("I94CTRY", when(col("I94CTRY") == "INVALID: UNITED STATES", "UNITED STATES").otherwise(col("I94CTRY"))) \
                 .withColumn("I94CTRY", when(col("I94CTRY") == "INVALID: U.S. VIRGIN ISLANDS", "VIRGIN ISLANDS").otherwise(col("I94CTRY"))) \

countries_dim = countries_dim.dropDuplicates()
countries_dim.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------------------+
|Code|I94CTRY                  |
+----+-------------------------+
|163 |UZBEKISTAN               |
|414 |KIRIBATI                 |
|586 |HAITI                    |
|154 |GEORGIA                  |
|420 |TUVALU                   |
|117 |ITALY                    |
|273 |MALAYSIA                 |
|203 |LAOS                     |
|399 |EQUATORIAL GUINEA        |
|353 |TANZANIA                 |
|575 |COSTA RICA               |
|114 |HUNGARY                  |
|603 |GUYANA                   |
|760 |MAYOTTE (AFRICA - FRENCH)|
|386 |BENIN                    |
|209 |JAPAN                    |
|712 |ANTARCTICA               |
|300 |No Country Code (300)    |
|519 |DOMINICA                 |
|276 |SOUTH KOREA              |
+----+-------------------------+
only showing top 20 rows

### Step 3: Define the Data Model

### 3.1 Conceptual Data Model
Using fact and dimensional tables saved as parquet files we can implement them on any columnar database in Star Schema model. Star Schema model was chosen because it will be easier for Data Analysts and Data Scientists to understand and apply queries with best performance outcomes and flexibility.
Since the common data field from most of the datasets is city we will choose it for aggregations and also will be used to extrapolate the other data fields required for the data pipeline.

![star_schema.png](star_schema.png)



### 3.2 Mapping Out Data Pipelines

The pipeline steps are as follows:

Start Spark session

Import pyspark libraries

Load the datasets

Install pandas (pandas==0.25.1)

Clean demographic dataset (create gender columns, drop duplicates, match cities present in valid cities only)

Clean inmigration dataset (list of valid_states, convert i94mode and visa mode to a description, drop duplicates, Cast 
column types, drop unused columns)

Clean temperature by city dataframe (Rename Columns, Drop Duplicates, Filter by US, Rank based on city_code Partition, 
Convert City name to Port code)

Create Inmigration Fact table and save to parquet file

Create Date Dimension table and save to parquet file

Create Demographics Dimension table and save to parquet file

Create Countries Dimension table and save to parquet file

Create States Dimension table and save to parquet file

Create Temperatures Dimension table and save to parquet file

### Step 4: Run Pipelines to Model the Data 

### 4.1 Create the data model

In [24]:
# define output path
output_data = "s3a://gsc-udc-poc/capstone/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# create Date dimension
date_dim=inmigration.select(col('arrdate'),
                            col('arrival_date').alias('arrival_iso_date'),
                            date_format('arrival_date', 'y').alias('arrival_year'),
                            date_format('arrival_date', 'M').alias('arrival_month'),
                            date_format('arrival_date', 'd').alias('arrival_day'),
                            date_format('arrival_date', 'E').alias('arrival_dayofweek'))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# write date table to parquet files
date_dim.write.mode('overwrite').parquet(output_data+"/date_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# write inmigration table to parquet files
inm_fact.write.mode('overwrite').parquet(output_data+"/inmigration_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# write demographic table to parquet files
us.write.mode('overwrite').parquet(output_data+"/demographic_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# write temperature by city table to parquet files
temperature_dim.write.mode('overwrite').parquet(output_data+"/temperature_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# write country table to parquet files
countries_dim.write.mode('overwrite').parquet(output_data+"/country_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
# write state table to parquet files
state_dim = i94_addr.select(col('Code').alias('state_code'), col('State').alias('state_name'))
state_dim.write.mode('overwrite').parquet(output_data+"/state_table")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.2 Data Quality Checks

In [33]:
# Boto will be used to connect to S3 to read parquet tables
sc.install_pypi_package("boto3")
import boto3

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting boto3
  Downloading https://files.pythonhosted.org/packages/e0/a2/298aae95ef547b6003a32512d782ced9d3c78653b2219ee06afbad9c781e/boto3-1.19.10-py3-none-any.whl (131kB)
Collecting botocore<1.23.0,>=1.22.10 (from boto3)
  Downloading https://files.pythonhosted.org/packages/85/13/6346f79bc59c529f3b3de7d937763c04205b7c772313e0f6f183c08d29a4/botocore-1.22.10-py3-none-any.whl (8.1MB)
Collecting s3transfer<0.6.0,>=0.5.0 (from boto3)
  Using cached https://files.pythonhosted.org/packages/ab/84/fc3717a7b7f0f6bb08af593127171f08e3e0087c197922da09c01bfe7c3a/s3transfer-0.5.0-py3-none-any.whl
Collecting urllib3<1.27,>=1.25.4 (from botocore<1.23.0,>=1.22.10->boto3)
  Using cached https://files.pythonhosted.org/packages/af/f4/524415c0744552cce7d8bf3669af78e8a069514405ea4fcbd0cc44733744/urllib3-1.26.7-py2.py3-none-any.whl
Installing collected packages: urllib3, botocore, s3transfer, boto3
Successfully installed boto3-1.19.10 botocore-1.22.10 s3transfer-0.5.0 urllib3-1.26.7

In [38]:
s3_client = boto3.client('s3')
objects = s3_client.list_objects_v2(Bucket='gsc-udc-poc', Delimiter='/', Prefix ='capstone/')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Validate Schema (Fact & Dimension Tables)

In [39]:
for prefix in objects['CommonPrefixes']:
    table = prefix['Prefix'].split('/')[1]
    df = spark.read.parquet("s3://gsc-udc-poc/capstone/" + table)
    print("Table: " + table)
    schema = df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Table: country_table
root
 |-- Code: string (nullable = true)
 |-- I94CTRY: string (nullable = true)

Table: date_table
root
 |-- arrdate: double (nullable = true)
 |-- arrival_iso_date: date (nullable = true)
 |-- arrival_year: string (nullable = true)
 |-- arrival_month: string (nullable = true)
 |-- arrival_day: string (nullable = true)
 |-- arrival_dayofweek: string (nullable = true)

Table: demographic_table
root
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- median_age: string (nullable = true)
 |-- male_population: string (nullable = true)
 |-- female_population: string (nullable = true)
 |-- total_population: string (nullable = true)
 |-- foreign_born: string (nullable = true)
 |-- avg_household_size: string (nullable = true)
 |-- native_population: long (nullable = true)
 |-- asian_population: long (nullable = true)
 |-- black_population: long (nullable = true)
 |-- latino_population: long (nullable = true)
 |-- white_population: long (nulla

#### Validate Non Empty Tables (Fact & Dimension)

In [40]:
for prefix in objects['CommonPrefixes']:
    table = prefix['Prefix'].split('/')[1]
    df = spark.read.parquet("s3://gsc-udc-poc/capstone/" + table)
    record_num = df.count()
    if record_num <= 0:
        raise ValueError("This table is empty!")
    else:
        print("Table: " + table + f" is not empty: total {record_num} records.")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Table: country_table is not empty: total 289 records.
Table: date_table is not empty: total 3096313 records.
Table: demographic_table is not empty: total 180 records.
Table: inmigration_table is not empty: total 3096313 records.
Table: state_table is not empty: total 55 records.
Table: temperature_table is not empty: total 313024 records.

### 4.3 Data dictionary 

Please refer to data_dictionary.md

### Step 5: Complete Project Write Up

Rationale for the choice of tools and technologies for the project

Apache spark was used because of:
 * it's ability to handle multiple file formats with large amounts of data.
 * Apache Spark offers a lightning-fast unified analytics engine for big data.
 * Spark has easy-to-use APIs for operating on large datasets.

Propose how often the data should be updated and why.
 * The current I94 immigration data is updated monthly, therefore the data will be updated monthly.

Write a description of how you would approach the problem differently under the following scenarios:

The data was increased by 100x.
 * Spark can handle the increase but we would consider increasing the number of nodes in our cluster.
 
The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * In this scenario, Apache Airflow will be used to schedule and run data pipelines.
 
The database needed to be accessed by 100+ people.
 * In this scenario, we would move our analytics database into Amazon Redshift.