# Competing for Real Estate
### Data Engineering Capstone Project

#### Project Summary
You are competing in a new real estate market. The objective is to break into the market by capitalizing on short term and long term leasing for international markets in the US targeting students and  business travelers. More specifically, the following points for real estate and business forecasting.:

* Determine what country of residency people has the most amount of people coming in daily, monthly, yearly?
* What states are they arriving at? 
* In the state they are arrivnig at what is the demographics?

These points will help the marketing team better market towards countries where people are forecasted to travel to the US and also, forecast how many estates they have to get on their sites to meet the demands.


    
      

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
from datetime import datetime, timedelta
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType
from pyspark.sql import types as T
from pyspark.sql.functions import countDistinct, datediff
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F


### Step 1: Scope the Project and Gather Data

#### Scope 
In this project we are going to extract data from the data sources listed above using ApacheSpark for scalable data in order to create and ETL that organizes the data in a star-schema for the team. For smaller data sources we will be using Pandas dataframe.






#### Describe and Gather Data

To accomplish this study the following data sets were used:

* **I94 Immigration Data**: This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace.
   * country_code.csv: table contains country codes used in the dataset extracted from the data dictionary.
   * iport_data.csv: table contains airport codes associated with city codes extracted from the data dictionary.
       
* **U.S City Demographic Data**: This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.

* **Airport Code Table**: This table has airport codes corresponding to muncipals and states.



In [2]:
pd.set_option('display.max_columns', 50)


## Immigration Dataset

We are going to go through all the csvs and explore the columns and organization in order to put a data model together.

In [3]:
df_immig = pd.read_csv('immigration_data_sample.csv')

In [4]:
df_immig.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [5]:
df_immig.columns

Index(['Unnamed: 0', 'cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port',
       'arrdate', 'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa',
       'count', 'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd',
       'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'insnum',
       'airline', 'admnum', 'fltno', 'visatype'],
      dtype='object')

The definition for the fields are in the file **I94_SAS_Labels_Description.SAS**. We will be working with the following:
* arrdate: Arrvial Date
* i94city: Country of ciizenship
* i94res:  Country of residence
* depdate: Departure Date
* i94addr: State of Arrival
* count:   Number of people
* i94yr:   Year of arrival
* i94mon:  Month of Arrival
* gender:  Gender





#### Data from the Data Dictionary

In order to get the name of the city and countries for both country of origin and residency, we are going to use the following extracted from the data dictionary.

For country_codes we are going to check that there are no nulls in code and country. It is important down the line that it is not missing data.

In [6]:
df_country = pd.read_csv('country_code.csv')

In [7]:
df_country.shape

(289, 2)

In [8]:
df_country.isnull().sum()

ï»¿code      0
country    0
dtype: int64

Just as we did with country_codes, we also want to check iport data and make sure that it is complete

In [9]:
df_port = pd.read_csv('iport_data.csv', index_col = False)
df_port.columns

Index(['code', 'location', 'state'], dtype='object')

In [10]:
df_port.shape

(585, 3)

In [11]:
df_port.isnull().sum()

code        0
location    0
state       0
dtype: int64

In [12]:
df_port = df_port[~df_port['state'].isnull()]
df_port.to_csv('iport_data.csv', index = False)

In [13]:
df_port.head()

Unnamed: 0,code,location,state
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


# Demographics Data

Demographics data is important and it is also important to note that it only list cities with more than 100,000 people. The important fields are listed below and we can see that in regards to the fields we are going to be using there is no missing data.

In [14]:
df_demo = pd.read_csv('us-cities-demographics.csv', sep=';')

In [15]:
df_demo.shape

(2891, 12)

In [16]:
df_demo.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [17]:
df_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


For this the fields we are going to use for this analysis are the following:
* City
* State
* Male Population
* Female Population
* Total Population
* Sate Code
* Race
* Count

In [18]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')


In [19]:
#write to parquet
#df_spark.write.parquet("sas_data")


In [20]:
spark_df=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

Schema looks exactly like the sample data.

In [21]:
spark_df.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [22]:
spark_df.take(5)


[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='WA', depdate=20582.0, i94bir=29.

The dates are in SAS format, meaning that it is counting the number of days since 01-01-1960. Using the following formula we are going to conver the SAS dates into datetypes.

In [23]:
def convert_datetime(x):
    '''
    Description: This function takes a SAS date and returns a datetime object.
    Input: SAS date
    Output: datetime object.
    
    '''
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_datetime_from_sas = udf(lambda x: convert_datetime(x), T.DateType())

In [24]:
spark_df = spark_df.withColumn('arrdate', udf_datetime_from_sas('arrdate'))

In [25]:
spark_df = spark_df.withColumn('depdate', udf_datetime_from_sas('depdate'))


We want to see what kind of travel data is available. 

In [26]:
spark_df.groupby('i94mode').count().show()

+-------+-------+
|i94mode|  count|
+-------+-------+
|   null|    239|
|    1.0|2994505|
|    3.0|  66660|
|    2.0|  26349|
|    9.0|   8560|
+-------+-------+



In this instance, we care about trips that happened between flights, which is '1.0'. The reason for this is that these are the long trips, we don't necessarily care about domestic trips, we want to see immigration and not migration.

In [27]:
spark_df = spark_df.filter("i94mode == 1.0")

Now, we want to see what kind of data is available in Gender.

In [28]:
spark_df.groupby('gender').count().show()

+------+-------+
|gender|  count|
+------+-------+
|     F|1255002|
|  null| 411902|
|     M|1326478|
|     U|    117|
|     X|   1006|
+------+-------+



In this instant, we see that there is 'U' for 'unknown' and also null. Since it is null, then it is 'Unknown' and we are going to fill in the null with 'U'

In [29]:
spark_df = spark_df.na.fill('U', 'gender')

We want to make sure that 'i94res' and 'i94port' has the same amount of non-null rows as the dataframe. This is because we do not want any null values in our fact table since these are essential features.

In [30]:
spark_df.select('*').count()

2994505

In the later portion, i94res and i94port are going to be important for the organization of our data model and we have to make sure that there is no missing data. I want to note that for this study, I did make the assumption that the person travels to the city where their i94port is located. I know that isn't always the case, but for this study we made that assumption.

In [31]:
spark_df.filter('i94res is null').count()

0

In [32]:
spark_df.filter('i94port is null').count()

0

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Since the scope of this project is to forecast future travel from foreign countries to the US, the aggregated immigration data will be the fact table.

**Fact Immigration** will be:
* immigration_id         (Primary Key)
* gender_id              
* arrival_date           
* i94res                 
* i94port              
* i94visa:         
* term             



For the dimension tables there are going to be 4 dimension tables:

 **dim_date**
* arrival_date (Primary Key)
* year
* month
* day
* week
* weekday
* dayofyear

**dim_airport**
* i94port  (Primary Key)
* city
* state_code

**dim_visa**
* i94visa  (Primary Key)
* visa

**dim_gender**
* gender_id  (Primary Key)
* gender


The following is a seperate schema that shares the dim_destination table but prioritizes demographics information for research on estate owners in the area. It uses the fact table **fact_demographics**.

**fact_demographics**
* city_id  (compound primary key (city, state)
* race
* race_count

**dim_city**
* city_id  (Primary Key)
* city
* state
* state_code
* Female Population
* Male Population




![title](img/schema.png)


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

For the **fact_immigrant** table the following is the procedure after cleaning the data using Apache Spark
1. Create the airport dimension from the iport_data.csv.
2. Create the country of origin dim table from the country_code.csv.
3. Next, for the time dimension I am going to use distinct 'arrival_date' from the fact table and convert it into a DateType() object. Then I am going to parse out the year, month, day etc and create the time dimension table.
4. I am going to parse out the distinct gender characters and create a dimension table for them.
5. Then I am going to create the visa dimension table for which I will parse out and create a dimension from the fact table.
6. After that, I am going to calculate the days the person stayed in the US using (departure_date - arrival_date) from the fact table.
7. Finally, I am going to write parquet files.

For the **fact_demographics** table the following is the procedure after cleaning the data using pandas dataframes and Apache Spark
1. Using the demographics csv file I will group the data by city, state, state_code and find the mean of Female Population and Male Population since it is repetitive for each row.
2. I will assign a unique identifier to each row for city and state compound Primary Key.
3. I will create a fact table using this city_id and count the amount of people per race in the city.
4. Finally, I will write the results in parquet files.


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Demographic Fact Table and Dimension Tables

We are going to start by creating the fact_demographics and dim_city tables for which I will be using Pandas dataframes.

In [34]:
# Write code here
df_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


I need to capitalize city since all the letters will be capitalized in the dimension table dim_airport.

In [35]:
df_demo['City'] = df_demo['City'].str.upper()

In [36]:
dimension_city = df_demo.groupby(['City', 'State', 'State Code'])[['Male Population', 'Female Population']].mean()

In [37]:
dimension_city.reset_index(inplace = True)


In [38]:
dimension_city.columns

Index(['City', 'State', 'State Code', 'Male Population', 'Female Population'], dtype='object')

In [39]:
dimension_city= dimension_city.reset_index().rename(columns={'index': 'city_id'})

In [40]:
dimension_city.head()

Unnamed: 0,city_id,City,State,State Code,Male Population,Female Population
0,0,ABILENE,Texas,TX,65212.0,60664.0
1,1,AKRON,Ohio,OH,96886.0,100667.0
2,2,ALAFAYA,Florida,FL,39504.0,45760.0
3,3,ALAMEDA,California,CA,37747.0,40867.0
4,4,ALBANY,Georgia,GA,31695.0,39414.0


In [41]:
temp_demo = dimension_city[['City', 'State', 'city_id', 'State Code']]

In [42]:
df_demo = pd.merge(df_demo,temp_demo, on = ['City', 'State', 'State Code'], how = 'left' )

In [43]:
fact_demo = df_demo.groupby(['city_id', 'Race'])['Count'].sum()

In [44]:
fact_demo.head().reset_index()

Unnamed: 0,city_id,Race,Count
0,0,American Indian and Alaska Native,1813
1,0,Asian,2929
2,0,Black or African-American,14449
3,0,Hispanic or Latino,33222
4,0,White,95487


In [45]:
fact_demo = fact_demo.reset_index().rename(columns={'index': 'demographics_id'})

In [46]:
dim_city = spark.createDataFrame(dimension_city)

In [47]:
dim_city = dim_city.select('city_id', 
                           col('City').alias('city'),
                            col('State Code').alias('state_code'),
                          col('Female Population').alias('female_population'),
                          col('Male Population').alias('male_population'))

In [48]:
fact_demographics = spark.createDataFrame(fact_demo)
fact_demographics =fact_demographics.withColumn('demographics_id', F.monotonically_increasing_id())

### Immigration Fact Table and Dimensions

For this section we are going to create the fact_immigration, dim_airport, dim_country_origin, dim_gender, dim_time and dim_visa using ApacheSpark.

In [49]:
immigration_df = spark_df.select( 
                col('arrdate').alias('arrival_date'), 
                col('depdate').alias('departure_date'),
                col('i94visa'),    
                'i94port',
                'i94res', 
                'gender')

In [50]:
immigration_df.head()

Row(arrival_date=datetime.date(2016, 4, 30), departure_date=datetime.date(2016, 5, 8), i94visa=1.0, i94port='LOS', i94res=438.0, gender='F')

In [51]:
country_df = spark.read.format('csv').option('header', 'True').load('country_code.csv')

In [52]:
country_df = country_df.withColumn('code', country_df.code.cast(IntegerType()))

In [53]:
dim_country_origin= country_df.select(
                            col('code').alias('i94res'),
                            'country')

In [54]:
dim_country_origin.head()

Row(i94res=582, country='MEXICO ')

In [55]:
immigration_df.head()

Row(arrival_date=datetime.date(2016, 4, 30), departure_date=datetime.date(2016, 5, 8), i94visa=1.0, i94port='LOS', i94res=438.0, gender='F')

In [56]:
airport_df =  spark.read.format('csv').option('header', 'True').load('iport_data.csv')

In [57]:
airport_df.head()

Row(code='ALC', location='ALCAN', state='AK')

In [58]:
dim_airport = airport_df.select(col('code').alias('i94port'),
                              col('location').alias('city'), 
                              col('state').alias('state_code'))

In [59]:
immigration_df= immigration_df.withColumn('long_term', datediff(col('departure_date'), col('arrival_date')))

In [60]:
def convert_gender(x):
    '''
    Description: Function takes in a gender_id and returns the label for the id.
    Input: gender_id
    Output: Label 
    
    '''
    if x == 'F':
        return 'Female'
    elif x == 'M':
        return 'Male'
    elif x == 'U':
        return 'Unknown'
    elif x == 'X':
        return 'Third Gender'
    else:
        return 'Null'
udf_gender = udf(lambda x: convert_gender(x), T.StringType())

In [61]:
gender_df = immigration_df.select(col('gender').alias('gender_id')).distinct()
dim_gender= gender_df.withColumn('gender', udf_gender('gender_id'))

In [62]:
dim_gender.head()

Row(gender_id='F', gender='Female')

In [63]:
def convert_visa(x):
    '''
    Description: Function takes in a visa id and returns a label.
    Input: visa id
    Output: Label
    
    '''
    if x == 1:
        return 'Business'
    elif x == 2:
        return 'Pleasure'
    elif x == 3:
        return 'student'
udf_visa = udf(lambda x: convert_visa(x), T.StringType())

In [64]:
visa_df = immigration_df.select('i94visa').distinct()

In [65]:
dim_visa= visa_df.withColumn('visa', udf_visa('i94visa'))

In [66]:
dim_visa.head()

Row(i94visa=1.0, visa='Business')

In [67]:
immigration_df.printSchema()

root
 |-- arrival_date: date (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- i94res: double (nullable = true)
 |-- gender: string (nullable = false)
 |-- long_term: integer (nullable = true)



In [68]:
fact_immigration= immigration_df.select('arrival_date', 
                                      'i94visa', 
                                      'i94port', 
                                      'i94res', 
                                      col('gender').alias('gender_id'),
                                      'long_term')\
                                    .withColumn('immigration_id', F.monotonically_increasing_id())

In [69]:
#fact_immigration= immigration_df.groupby('arrival_date', 'i94visa', 'i94port', 'i94res', 'gender_id', 'long_term').count()\
#                .withColumn('unique_id',  F.monotonically_increasing_id())


In [70]:
dim_time = fact_immigration.select('arrival_date').distinct() \
.withColumn('hour', hour(col('arrival_date')))\
.withColumn('day', dayofmonth(col('arrival_date')))\
.withColumn('week', weekofyear(col('arrival_date')))\
.withColumn('month', month(col('arrival_date')))\
.withColumn('year', year(col('arrival_date')))

## Write parquet files

In [71]:
fact_immigration.write.mode('overwrite').parquet('fact_immigration')
dim_gender.write.mode('overwrite').parquet('dim_gender')
dim_visa.write.mode('overwrite').parquet('dim_visa')


In [73]:
dim_airport.write.mode('overwrite').parquet('dim_airport')
dim_time.write.mode('overwrite').parquet('dim_time')
fact_demographics.write.mode('overwrite').parquet('fact_demographics')
dim_city.write.mode('overwrite').parquet('dim_city')
dim_country_origin.write.mode('overwrite').parquet('dim_country_origin')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [74]:
def check_table_null(tables):
    '''
    Description: This function takes in a list of spark dataframes and checks
        whether they are null. If they are null then is raises a ValueError.
        
    Input: 
        tables - List of Spark Dataframes
    Output: Either raises a ValueError or prints pass statements.
    
    '''
    for item in range(len(tables)):
        print(f"Checking primary key for table {tables[item][1]}...")
        is_null = tables[item][0].count()
        if (is_null == 0):
            raise ValueError(f"Table {tables[item][1]} is empty! Quality Check Failed")
        
        print(f"Table {tables[item][1]} passed.")    
    
    

In [78]:
fact_dim = [[fact_immigration, 'fact_immigration'], 
            [dim_gender, 'dim_gender'],
            [dim_visa, 'dim_visa'],
            [dim_country_origin, 'dim_country_origin'],
            [dim_airport, 'dim_airport'], 
            [dim_time, 'dim_time'], 
            [fact_demographics, 'fact_demographics'],
            [dim_city, 'dim_city']]

In [79]:
check_table_null(fact_dim)

Checking primary key for table fact_immigration...
Table fact_immigration passed.
Checking primary key for table dim_gender...
Table dim_gender passed.
Checking primary key for table dim_visa...
Table dim_visa passed.
Checking primary key for table dim_country_origin...
Table dim_country_origin passed.
Checking primary key for table dim_airport...
Table dim_airport passed.
Checking primary key for table dim_time...
Table dim_time passed.
Checking primary key for table fact_demographics...
Table fact_demographics passed.
Checking primary key for table dim_city...
Table dim_city passed.


In [80]:
def table_check_primary_keys_null(tables, primary_keys):
    '''
    Description: This function takes in two list. One with spark dataframes and the other a list of 
        primary keys associated with their respective spark dataframe. It checks whether there are null
        primary keys and either raises a ValueError or prints a pass.
    Input: 
        tables - list of spark dataframes.
        primary_keys - list of primary keys
    Output:
        It either raises a ValueError a prints a pass statement.
    
    '''
    for item in range(len(tables)):
        print(f"Checking primary key for table {primary_keys[item][0]}...")
        for pr in primary_keys[item][1]:
            null_values = tables[item].select(f'{pr}').where(f'{pr} is null').count()
            if null_values > 0:
                raise ValueError(f"Primary key is null for {pr} column! Failed Quality Check")
        print(f"Table {primary_keys[item][0]} passed.")
            

    

In [82]:


fact_dim_tables = [fact_immigration, dim_gender, dim_visa, dim_airport, dim_time, fact_demographics, dim_city, dim_country_origin]
primary = [['fact_immigration',['immigration_id']],
                ['dim_gender', ['gender_id']], 
                ['dim_visa', ['i94visa']],
                ['dim_airport', ['i94port']],
                ['dim_time', ['arrival_date']], 
                 ['fact_demographics',['demographics_id']],
                 ['dim_city', ['city', 'state_code']],
                  ['dim_country_origin', ['i94res']]
                ]
table_check_primary_keys_null(fact_dim_tables, primary)

Checking primary key for table fact_immigration...
Table fact_immigration passed.
Checking primary key for table dim_gender...
Table dim_gender passed.
Checking primary key for table dim_visa...
Table dim_visa passed.
Checking primary key for table dim_airport...
Table dim_airport passed.
Checking primary key for table dim_time...
Table dim_time passed.
Checking primary key for table fact_demographics...
Table fact_demographics passed.
Checking primary key for table dim_city...
Table dim_city passed.
Checking primary key for table dim_country_origin...
Table dim_country_origin passed.


#### Data Dictionary

In the data_dictionary.txt file

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

For data that could scale, for example the **i94 Immigration** I used Apache Spark to handle the ETL process even if the data scales. For finite data, like demographics, which has limited cities, I used pandas dataframes.

For this project the data should be updated daily. The reason for this is that it is aggregated at the daily level and it would help to validate the forecasting models this would feed.

If the data increased by 100x, I would parquet the data into an S3 bucket as a container. For the ETL process, I would use AWS EMR and distribute the data among the clusters.

i would use airflow with the Spark operator to create a cluster, run the ETL process and close the cluster when it is done.

I would provide them with an IAM_Role to access the S3 bucket.