# Project Title
### Data Engineering Capstone Project

#### Project Summary
The Capstone project focuses on creating a Data Warehouse for retaining the I-94 immigration records to provide different insights on the demographics of visitors entering the United States.
The project implements a dimensional Data Model for the Immigration, Cities and Airport codes data sets and also builds out an ETL pipeline to load the data into AWS Redshift.
A star schema is created with dimensional modeling with a fact table for the immigration data by airports/US cities.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
from datetime import datetime
import os
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, desc
import pyspark.sql.functions as F
from datetime import datetime
from pyspark.sql import types as T

 ### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

Scope - The project focuses on :
  (i)  Immigration data only for US. 
  (ii) Immigration data for a span of 4 years (2014-2018)

Data Sources
------------
1. immigration_data_sample.csv - This data set contains immigration information for visitors like - i94 ID, arrival_Date, departure_date,
visa_type, gender, birth_year, admsn_no, flight_no, airline

2. us-cities-demographics.csv - This data set contains the demographic information for US cities like - race, gender, ethnicity, total population, total number of males/females in the city etc 

3. airport-codes.csv - This data set contains the airport codes, airport name, type of airport,  region and country to which the airport belongs.


Data Model 
----------

Dimensions :
-----------
1. dim_airport  - Dimension table for airport codes ( PK - airport_code)

Columns
------
ident
name
country
region
co-ordinates

2. dim_city_demog - Dimension table for city demographics ( PK = city name) 

Columns
------
city
state
state_cd
median_Age
male_population
female_population
Race

3. dim_visitors - Dimension table with immigration data for US visitors ( PK = I94 admission num)

Columns
------
admn_num
visa_type
transport_mode
arr_Date
dep_date
birth_yr
gender
port_of_entry
age (calculated field)


Fact
----
4. fact_immigrations - Fact table which records the US visitors coming in by airport/cities.

The ETL Data Pipeline is built using Python commands and the Data Warehouse is built in AWS.
The data sources are staged into AWS S3 buckets, cleansed and loaded into Redshift database.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

1. immigration_data_sample.csv - This data comes from the US National Tourism and Trade Office. The dataset contains international visitor arrival statistics by world regions and select countries, type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry for select countries 

2. us-cities-demographics.csv - This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey

3. airport-codes.csv - This is a simple table of airport codes and corresponding cities. The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code


In [2]:
# Read in the data here

# read immigration data file
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
#print(df.count())

#read us-cities dataset
fname = '/home/workspace/us-cities-demographics.csv'
df_city = pd.read_csv(fname, delimiter = ";")

#read airport-codes dataset
fname = '/home/workspace/airport-codes_csv.csv'
df_airport = pd.read_csv(fname)


In [3]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [4]:
df_city.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [5]:
df_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [6]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['KEY']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['SECRET']



In [7]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [8]:
#write to parquet
#df_spark.write.parquet("sas_data1")
#df_spark=spark.read.parquet("sas_data1")

df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [9]:
# Performing cleaning tasks here

#Drop duplicates and NaN values from the dataframe
df_spark.dropna()
df_spark.dropDuplicates()
df_spark.show(5)


+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

--> The immigration data will be aggregated by city/year

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Extract the data files, cleanse it and write it as parquet files into S3 buckets for each of the tables - dimensions/facts
2. Load the data from S3 buckets into AWS Redshift database.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [10]:
# Write code here

# create age column from original biryear column for Immigration dataset
df_spark = df_spark.withColumn("age", (df_spark['i94yr'] - df_spark['biryear']))
df_spark.printSchema()
df_spark.show(5)

#Convert arrdate into date datatype in immigration file

# create datetime columns for arrival and departure times 
get_datetime = udf(lambda x: datetime.fromtimestamp(x), T.TimestampType())
df_spark = df_spark.withColumn("arrdatetime",get_datetime(df_spark['arrdate']))
#df_spark = df_spark.withColumn("depdatetime",get_datetime(df_spark['depdate']))
df_spark.show(5)
df_spark.printSchema()



root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [11]:
## Read immigration data file and extract columns to create dim_visitors table
visitors_table = df_spark['cicid', 'i94yr', 'i94mon', 'i94port', 'i94addr','arrdatetime', 'depdate', 'i94visa', 'biryear','gender', 'airline', 'admnum', 'fltno', 'visatype', 'age']
#print(visitors_table)
#visitors_table.show(5)
visitors_table.createOrReplaceTempView("dim_visitors")

spark.sql("SELECT * from dim_visitors").show()

+-----+------+------+-------+-------+-------------------+-------+-------+-------+------+-------+--------------+-----+--------+----+
|cicid| i94yr|i94mon|i94port|i94addr|        arrdatetime|depdate|i94visa|biryear|gender|airline|        admnum|fltno|visatype| age|
+-----+------+------+-------+-------+-------------------+-------+-------+-------+------+-------+--------------+-----+--------+----+
|  6.0|2016.0|   4.0|    XXX|   null|1970-01-01 05:42:53|   null|    2.0| 1979.0|  null|   null| 1.897628485E9| null|      B2|37.0|
|  7.0|2016.0|   4.0|    ATL|     AL|1970-01-01 05:42:31|   null|    3.0| 1991.0|     M|   null|  3.73679633E9|00296|      F1|25.0|
| 15.0|2016.0|   4.0|    WAS|     MI|1970-01-01 05:42:25|20691.0|    2.0| 1961.0|     M|     OS|  6.66643185E8|   93|      B2|55.0|
| 16.0|2016.0|   4.0|    NYC|     MA|1970-01-01 05:42:25|20567.0|    2.0| 1988.0|  null|     AA|9.246846133E10|00199|      B2|28.0|
| 17.0|2016.0|   4.0|    NYC|     MA|1970-01-01 05:42:25|20567.0|    2.0| 20

In [12]:
## Read city demographics data file and extract columns to create dim_city_demog table

df_city = spark.read.option("delimiter", ";").csv('/home/workspace/us-cities-demographics.csv')
df_city.printSchema()

#Remove first row/header
#df_city = df_city.filter(df['_c0'] == "City")
#df_city.show(5)

#city_table = df_city['city', 'state', 'state code', 'median age', 'Male Population', 'Female Population', 'Total Population', 'Foreign-born']
city_table = df_city['_c0', '_c1', '_c9', '_c2', '_c3', '_c4', '_c5', '_c7']
print(city_table)

#Remove first row/header
city_table = city_table.filter(city_table['_c0'] != 'City')

city_table.show(5)
df_city.createOrReplaceTempView("dim_city_demog")


root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)

DataFrame[_c0: string, _c1: string, _c9: string, _c2: string, _c3: string, _c4: string, _c5: string, _c7: string]
+----------------+-------------+---+----+------+------+------+-----+
|             _c0|          _c1|_c9| _c2|   _c3|   _c4|   _c5|  _c7|
+----------------+-------------+---+----+------+------+------+-----+
|   Silver Spring|     Maryland| MD|33.8| 40601| 41862| 82463|30908|
|          Quincy|Massachusetts| MA|41.0| 44129| 49500| 93629|32935|
|          Hoover|      Alabama| AL|38.5| 38040| 46799| 84839| 8229|
|Rancho Cucamonga|   California| CA|34.5| 88

In [13]:
## Read airport codes data file and extract columns to create dim_airport table

df_airport = spark.read.csv('/home/workspace/airport-codes_csv.csv')
df_airport.printSchema()
df_airport.show(5)

#airport_table = df_airport['ident', 'type', 'name', 'iso_country', 'iso_region', 'municipality', 'local_code']
airport_table = df_airport['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11']
print(airport_table)

#Remove first row/header
airport_table = airport_table.filter(airport_table['_c0'] != 'ident')
airport_table.head()

airport_table.show(5)
df_airport.createOrReplaceTempView("dim_airport")


root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  _c0|          _c1|                 _c2|         _c3|      _c4|        _c5|       _c6|         _c7|     _c8|      _c9|      _c10|                _c11|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iat

In [14]:
# Join the tables to create FACT table

fact_immigrations_df = spark.sql("""
SELECT a.i94yr, a.i94mon, a.i94port, a.i94addr, a.arrdatetime, a.visatype, a.gender, a.age, 
b._c0 as City, b._c1 as State, b._c9 as State_CD, b._c2 as Median_Age, b._c3 as Male_Population, 
b._c4 as Female_Population, b._c5 as Total_Population, b._c7 as Foreign_Born, c._c0 as Airport_Code,
c._c2 as Airport_Name, c._c6 as Airport_Region
FROM dim_visitors a
JOIN dim_city_demog b ON (a.i94addr = b._c9)
JOIN dim_airport c ON (a.i94addr = substr(c._c6,4,2))
""")
    
fact_immigrations_df.limit(5).show()
#fact_immigrations_df.limit(5)
#print(fact_immigrations_df)

+------+------+-------+-------+-------------------+--------+------+----+----------+-------+--------+----------+---------------+-----------------+----------------+------------+-------+-------------+--------------------+----+---+---+-----+--------------+----+----+----+--------------------+
| i94yr|i94mon|i94port|i94addr|        arrdatetime|visatype|gender| age|      City|  State|State_CD|Median_Age|Male_Population|Female_Population|Total_Population|Foreign_Born|    _c0|          _c1|                 _c2| _c3|_c4|_c5|  _c6|           _c7| _c8| _c9|_c10|                _c11|
+------+------+-------+-------+-------------------+--------+------+----+----------+-------+--------+----------+---------------+-----------------+----------------+------------+-------+-------------+--------------------+----+---+---+-----+--------------+----+----+----+--------------------+
|2016.0|   4.0|    ATL|     AL|1970-01-01 05:42:31|      F1|     M|25.0|Birmingham|Alabama|      AL|      35.6|         102122|      

In [16]:
fact_immigrations_df.createOrReplaceTempView("fact_immigration")

fact_immigrations_df.dropDuplicates()
fact_immigrations_df.show(5)

+------+------+-------+-------+-------------------+--------+------+----+----------+-------+--------+----------+---------------+-----------------+----------------+------------+-------+-------------+--------------------+----+---+---+-----+--------------+----+----+----+--------------------+
| i94yr|i94mon|i94port|i94addr|        arrdatetime|visatype|gender| age|      City|  State|State_CD|Median_Age|Male_Population|Female_Population|Total_Population|Foreign_Born|    _c0|          _c1|                 _c2| _c3|_c4|_c5|  _c6|           _c7| _c8| _c9|_c10|                _c11|
+------+------+-------+-------+-------------------+--------+------+----+----------+-------+--------+----------+---------------+-----------------+----------------+------------+-------+-------------+--------------------+----+---+---+-----+--------------+----+----+----+--------------------+
|2016.0|   4.0|    ATL|     AL|1970-01-01 05:42:31|      F1|     M|25.0|Birmingham|Alabama|      AL|      35.6|         102122|      

In [17]:
# write visitors table to parquet files partitioned by year and city
visitors_table.write.mode("overwrite").partitionBy("i94yr").parquet('dim_visitors.pq')
#visitors_table.write.mode("overwrite").partitionBy("i94yr").parquet('s3://udacity-capstone-proj/dim_visitors/')

# write airport table to parquet files 
airport_table.write.mode("overwrite").parquet('dim_airport.pq')

# write city table to parquet files 
city_table.write.mode("overwrite").parquet('dim_city_demog.pq')


In [18]:
# write FACT_IMMIGRATION table to parquet files partitioned by year and city
fact_immigrations_df.limit(100).write.mode("overwrite").partitionBy("i94yr","i94addr").parquet('fact_immig.pq')

In [19]:
#Define S3 Buckets and create Data Lake

#s3 = boto3.resource('s3')
s3 = boto3.client('s3')
BUCKET = 'udacity-capstone-proj'
PREFIX1 = 'dim_visitors'
PREFIX2 = 'dim_airport'
PREFIX3 = 'dim_city_demog'
PREFIX4 = 'fact_immigrations'
root_path1 = '/home/workspace/dim_visitors.pq' # local folder 
root_path2 = '/home/workspace/dim_airport.pq' # local folder 
root_path3 = '/home/workspace/dim_city_demog.pq' # local folder 
root_path4 = '/home/workspace/fact_immig.pq' # local folder 

#s3.meta.client.upload_file('/home/workspace/dim_visitors.pq', BUCKET, PREFIX)

#Upload Parquet files to S3 Bucket
for path, subdirs, files in os.walk(root_path1):
            path = path.replace("\\","/")
            directory_name = path.replace(root_path1,"")
            for file in files:
                s3.upload_file(os.path.join(path, file),BUCKET, PREFIX1)

#Upload Parquet files to S3 Bucket
for path, subdirs, files in os.walk(root_path2):
            path = path.replace("\\","/")
            directory_name = path.replace(root_path2,"")
            for file in files:
                s3.upload_file(os.path.join(path, file),BUCKET, PREFIX2)
                
#Upload Parquet files to S3 Bucket
for path, subdirs, files in os.walk(root_path3):
            path = path.replace("\\","/")
            directory_name = path.replace(root_path3,"")
            for file in files:
                s3.upload_file(os.path.join(path, file),BUCKET, PREFIX3)
                
#Upload Parquet files to S3 Bucket
for path, subdirs, files in os.walk(root_path4):
            path = path.replace("\\","/")
            directory_name = path.replace(root_path4,"")
            for file in files:
                s3.upload_file(os.path.join(path, file),BUCKET, PREFIX4)                
                

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#Check the record count in each table and ensure there is data
cnt1 = spark.sql("SELECT * from dim_visitors").count()
print(cnt1)

if cnt1 <= 0:
    print("Error - No Data in dim_visitors !!")
    
#Check the record count in each table and ensure there is data
cnt1 = spark.sql("SELECT * from dim_city_demog").count()
print(cnt1)

if cnt1 <= 0:
    print("Error - No Data in dim_city_demog !!")
    
#Check the record count in each table and ensure there is data
cnt1 = spark.sql("SELECT * from dim_airport").count()
print(cnt1)

if cnt1 <= 0:
    print("Error - No Data in dim_airport !!")
    
#Check the record count in each table and ensure there is data
cnt1 = spark.sql("SELECT * from fact_immigration").count()
print(cnt1)

if cnt1 <= 0:
    print("Error - No Data in fact_immigrations !!")    

3096313
2892
55076


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

In [None]:
Tools and Technologies Used :
    Python - The python wrapper script uses pandas dataframes to easily manipulate huge datasets.
    
    AWS S3 Storage - Data Lake is created on AWS cloud and the data is saved in columnar format in the form of parquet files saved in S3 which is 
    easily available and accessible on the cloud. (Allows extensibility to load the data from S3 to any database , ex: Redshift)
    
    Spark - The Spark instance used in the program enables parallelization of the code in a distributed cluster enabling scalability. PySpark SQL is used to 
    manipulate and load the data to perform ETL.
    
    Data Model - Star schema (Denormalized to support OLAP queries)

In [None]:
Frequency : The data need to be updated every day to record the number of i94 entries each day to track the number of immigrants, visitors etc. entering the US

In [None]:
If the data volume is increased by 100x, the ETL would need to be run in parallel mode on multiple clusters

In [None]:
If the data populates a dashboard that gets updated daily, create an Airflow DAG to be scheduled to run daily at 7am, which will pick the latest(previous days
data) and processes the datafile and runs the ETL.                                                                                                                                            

In [None]:
If the database needs to be accessed by 100+ people, load the data from S3 into Redshift on AWS cloud which is scalable enough to support multiple concurrent 
user access on the cloud.