# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from datetime import datetime, timedelta
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.types import TimestampType, StringType, DateType
import  pyspark.sql.functions as F

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

There is a main data source called I94 Immigration Data and this data comes from the US National Tourism and Trade Office. The original source if from https://travel.trade.gov/research/reports/i94/historical/2016.html. In file I94_SAS_Labels_Descriptions.SAS, it gives the description of this data and also lists the dimension code description such as country code and visa code. There are also some other tables which will help to create more dimension tables. SoI plan to create one file which contains all of the dimension data from imigration file so that we can easily read the data. Besides, I will create one fact table based on the immigration data. us cities demographics data will be tranformmed to dimension data. I will be using spark to process these kind of big data.

In [2]:
# Read in the data here
df = pd.read_csv('immigration_data_sample.csv')

In [3]:
df.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [105]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
#df_spark = spark.read.format("csv").option("header", "true").load("immigration_data_sample.csv")

In [106]:
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [107]:
df_spark.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [108]:
df_spark.describe()

DataFrame[summary: string, cicid: string, i94yr: string, i94mon: string, i94cit: string, i94res: string, i94port: string, arrdate: string, i94mode: string, i94addr: string, depdate: string, i94bir: string, i94visa: string, count: string, dtadfile: string, visapost: string, occup: string, entdepa: string, entdepd: string, entdepu: string, matflag: string, biryear: string, dtaddto: string, gender: string, insnum: string, airline: string, admnum: string, fltno: string, visatype: string]

In [109]:
df_temp = spark.read.option("header","true").csv( '../../data2/GlobalLandTemperaturesByCity.csv')
df_temp.createOrReplaceTempView("temperature")

In [110]:
df_temp.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [111]:
df_airport = spark.read.option("header","true").csv( 'airport-codes_csv.csv')
df_airport.createOrReplaceTempView("airport")

In [112]:
df_airport.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [113]:
df_spark_clean = df_spark.dropna(how = "any", subset = ["fltno", "gender", "airline", "depdate"])

In [114]:
df_spark_clean.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|  6.66643185E8|   93|      B2|
| 27.0|2016.0|   4.0| 101.0| 101.0|    BOS|20545.0|    1.0|     MA|20549.0|  58.0|    1.0|  1.0|20160401|     TI

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

The star schema has been choosen as it's forward and easy for use and understand. Following are the steps:
1.convert the numeric column with string type to int type as we need to map it with related dimension tables
2.Transform the data from labels description doc to organized dim.cfg which we can use python config parser to read it eaily
3.Extract related dimension data from immigration table
4.Immigration table will be served as the fact table
5.US cities demographics data will be served as another dimension table which can join the immigration table by i94addr column
6.Will abandon the airport codes table and temperature table as can't find the relationships between these two tables and immigration table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [115]:
df_spark_valid = df_spark_clean.select(col("*"), df_spark_clean.i94cit.cast('int').alias('i94cit_int'))

In [116]:
df_spark_valid = df_spark_valid.select(col("*"), df_spark_valid.i94yr.cast('int').alias('i94yr_int'))

In [117]:
df_spark_valid = df_spark_valid.select(col("*"), df_spark_valid.i94mon.cast('int').alias('i94mon_int'))

In [118]:
df_spark_valid = df_spark_valid.select(col("*"), df_spark_valid.i94bir.cast('int').alias('i94bir_int'))

In [119]:
df_spark_valid.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----------+---------+----------+----------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|i94cit_int|i94yr_int|i94mon_int|i94bir_int|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----------+---------+----------+----------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|20691.0|  55.0|    2.0|  1.0|20160401|    null| null|      T|      O|   null|      M| 1961.0|09302016|     M|  null|     OS|  6.66643185E8

In [120]:
def to_datetime(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
udf_to_datetime_sas = udf(lambda x: to_datetime(x), DateType())

In [121]:
df_spark_valid = df_spark_valid.withColumn("arrivedate", udf_to_datetime_sas('arrdate'))

In [122]:
df_spark_valid = df_spark_valid.withColumn("depardate", udf_to_datetime_sas('depdate'))

In [123]:
df_spark_valid = df_spark_valid.select(col("*"), df_spark_valid.i94mode.cast('int').alias('i94mode_int'))

In [124]:
df_spark_valid = df_spark_valid.select(col("*"), df_spark_valid.i94visa.cast('int').alias('i94visa_int'))

In [125]:
df_spark_valid.show(2)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----------+---------+----------+----------+----------+----------+-----------+-----------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|i94cit_int|i94yr_int|i94mon_int|i94bir_int|arrivedate| depardate|i94mode_int|i94visa_int|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----------+---------+----------+----------+----------+----------+-----------+-----------+
| 15.0|2016.0|   4.0| 101.0| 101.0|    WAS|20545.0|    1.0|     MI|

In [126]:
df_spark_valid.createOrReplaceTempView("immigration_valid")

In [127]:
country_code_table = spark.sql("""
    SELECT distinct i94cit_int as code
    FROM immigration_valid 
""")

In [128]:
country_code_table.show(5)

+----+
|code|
+----+
| 148|
| 392|
| 516|
| 251|
| 255|
+----+
only showing top 5 rows



In [129]:
from configparser import ConfigParser
Config = ConfigParser()
Config.read('dim.cfg')
def from_code(mysection, mykey):
    myvalue = ""
    try:
        myvalue = Config[mysection][str(mykey)].replace("'","")
    except:
        myvalue = "N/A"
    return myvalue

In [130]:
get_countrycode = udf(lambda x: from_code("country", x), StringType())
country_table = country_code_table.withColumn("country", get_countrycode('code'))

In [131]:
country_table.sort("code").show(5)

+----+--------+
|code| country|
+----+--------+
| 101| ALBANIA|
| 102| ANDORRA|
| 103| AUSTRIA|
| 104| BELGIUM|
| 105|BULGARIA|
+----+--------+
only showing top 5 rows



In [132]:
country_table.write.parquet("data/tables/country", "overwrite")

In [133]:
country_table.count()

233

In [134]:
country_table.createOrReplaceTempView("country")

In [135]:
port_code_table = spark.sql("""
    SELECT distinct i94port as code
    FROM immigration_valid 
""")

In [136]:
get_portcode = udf(lambda x: from_code("port", "'"+x+"'"), StringType())
port_table = port_code_table.withColumn("port", get_portcode('code'))

In [137]:
port_table.show(5)

+----+--------------------+
|code|                port|
+----+--------------------+
| FMY|FORT MYERS, FL   ...|
| BGM|BANGOR, ME       ...|
| HEL|HELENA, MT       ...|
| FOK|  SUFFOLK COUNTY, NY|
| SNA|SAN ANTONIO, TX  ...|
+----+--------------------+
only showing top 5 rows



In [138]:
port_table.write.parquet("data/tables/port", "overwrite")

In [139]:
port_table.createOrReplaceTempView("port")

In [140]:
mode_code_table = spark.sql("""
    SELECT distinct i94mode_int as code
    FROM immigration_valid 
""")

In [141]:
mode_code_table.show()

+----+
|code|
+----+
|   1|
|   3|
|   9|
|   2|
+----+



In [142]:
get_modecode = udf(lambda x: from_code("mode", x), StringType())
mode_table = mode_code_table.withColumn("mode", get_modecode('code'))

In [143]:
mode_table.show()

+----+------------+
|code|        mode|
+----+------------+
|   1|         Air|
|   3|        Land|
|   9|Not reported|
|   2|         Sea|
+----+------------+



In [144]:
mode_table.write.parquet("data/tables/mode", "overwrite")

In [168]:
mode_table.createOrReplaceTempView("mode")

In [145]:
visa_code_table = spark.sql("""
    SELECT distinct i94visa_int as code
    FROM immigration_valid 
""")

In [146]:
visa_code_table.show()

+----+
|code|
+----+
|   1|
|   3|
|   2|
+----+



In [147]:
get_visacode = udf(lambda x: from_code("visa", x), StringType())
visa_table = visa_code_table.withColumn("visa", get_visacode('code'))

In [148]:
visa_table.show()

+----+--------+
|code|    visa|
+----+--------+
|   1|Business|
|   3| Student|
|   2|Pleasure|
+----+--------+



In [149]:
visa_table.write.parquet("data/tables/visa", "overwrite")

In [169]:
visa_table.createOrReplaceTempView("visa")

In [150]:
df_cities = spark.read.option("header","true").option("delimiter", ";").csv( 'us-cities-demographics.csv')
df_cities.createOrReplaceTempView("cities")

In [151]:
df_cities.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

In [152]:
state_code_table = spark.sql("""
    SELECT distinct i94addr as code
    FROM immigration_valid 
""")

In [153]:
state_code_table.createOrReplaceTempView("states_code")

In [154]:
state_code_table.show()

+----+
|code|
+----+
|  .N|
|  CI|
|  TC|
|  SC|
|  AZ|
|  NS|
|  SL|
|  LA|
|  NL|
|  MN|
|  NK|
|  OI|
|  AA|
|  NJ|
|  MX|
|   F|
|  JF|
|  DC|
|  CN|
|  OR|
+----+
only showing top 20 rows



In [155]:
state_code_table.count()

178

In [156]:
states_table = spark.sql("""
SELECT states_code.code, avg(cities.`Median Age`) as Median_Age, sum(cities.`Total Population`) as Total_Population
FROM cities inner join states_code on (cities.`State Code` = states_code.code) group by states_code.code
""")

In [157]:
states_table.show(5)

+----+------------------+----------------+
|code|        Median_Age|Total_Population|
+----+------------------+----------------+
|  SC| 33.82500000000001|       2586976.0|
|  AZ| 35.03750000000002|      2.249771E7|
|  LA| 34.62500000000001|       6502975.0|
|  MN|35.579629629629636|       7044165.0|
|  NJ|35.254385964912295|       6931024.0|
+----+------------------+----------------+
only showing top 5 rows



In [158]:
states_table.write.parquet("data/tables/states", "overwrite")

In [159]:
date_table = spark.sql("""
SELECT 
    arrivedate as date, 
    day(arrivedate) AS day,
    weekofyear(arrivedate) AS week,
    month(arrivedate) AS month,
    year(arrivedate) AS year, 
    weekday(arrivedate) AS weekday 
FROM (
    SELECT DISTINCT arrivedate 
    FROM immigration_valid     
    )
""")

In [160]:
date_table.show()

+----------+---+----+-----+----+-------+
|      date|day|week|month|year|weekday|
+----------+---+----+-----+----+-------+
|2016-04-25| 25|  17|    4|2016|      0|
|2016-04-22| 22|  16|    4|2016|      4|
|2016-04-30| 30|  17|    4|2016|      5|
|2016-04-26| 26|  17|    4|2016|      1|
|2016-04-04|  4|  14|    4|2016|      0|
|2016-04-16| 16|  15|    4|2016|      5|
|2016-04-18| 18|  16|    4|2016|      0|
|2016-04-11| 11|  15|    4|2016|      0|
|2016-04-29| 29|  17|    4|2016|      4|
|2016-04-19| 19|  16|    4|2016|      1|
|2016-04-14| 14|  15|    4|2016|      3|
|2016-04-08|  8|  14|    4|2016|      4|
|2016-04-02|  2|  13|    4|2016|      5|
|2016-04-20| 20|  16|    4|2016|      2|
|2016-04-13| 13|  15|    4|2016|      2|
|2016-04-09|  9|  14|    4|2016|      5|
|2016-04-17| 17|  15|    4|2016|      6|
|2016-04-01|  1|  13|    4|2016|      4|
|2016-04-07|  7|  14|    4|2016|      3|
|2016-04-12| 12|  15|    4|2016|      1|
+----------+---+----+-----+----+-------+
only showing top

In [161]:
immigration_table = spark.sql("""
SELECT i94yr_int as year, i94mon_int as month, i94cit_int as citizenship, i94port as port, i94mode_int as mode, i94addr as state,  arrivedate, depardate, i94visa_int as visa, i94bir_int as age, airline, fltno, visatype
FROM immigration_valid
""")

In [162]:
immigration_table.show()

+----+-----+-----------+----+----+-----+----------+----------+----+---+-------+-----+--------+
|year|month|citizenship|port|mode|state|arrivedate| depardate|visa|age|airline|fltno|visatype|
+----+-----+-----------+----+----+-----+----------+----------+----+---+-------+-----+--------+
|2016|    4|        101| WAS|   1|   MI|2016-04-01|2016-08-25|   2| 55|     OS|   93|      B2|
|2016|    4|        101| BOS|   1|   MA|2016-04-01|2016-04-05|   1| 58|     LH|00422|      B1|
|2016|    4|        101| ATL|   1|   MA|2016-04-01|2016-04-05|   1| 56|     LH|00422|      B1|
|2016|    4|        101| ATL|   1|   MA|2016-04-01|2016-04-17|   2| 62|     AZ|00614|      B2|
|2016|    4|        101| ATL|   1|   NJ|2016-04-01|2016-05-04|   2| 49|     OS|00089|      B2|
|2016|    4|        101| ATL|   1|   NY|2016-04-01|2016-06-06|   2| 43|     OS|00089|      B2|
|2016|    4|        101| HOU|   1|   TX|2016-04-01|2016-04-10|   2| 53|     TK|00033|      B2|
|2016|    4|        101| NYC|   1|   NJ|2016-04-01

In [163]:
immigration_table

DataFrame[year: int, month: int, citizenship: int, port: string, mode: int, state: string, arrivedate: date, depardate: date, visa: int, age: int, airline: string, fltno: string, visatype: string]

In [164]:
immigration_table = immigration_table.withColumn("immigration_id", F.monotonically_increasing_id())

In [165]:
immigration_table.show()

+----+-----+-----------+----+----+-----+----------+----------+----+---+-------+-----+--------+--------------+
|year|month|citizenship|port|mode|state|arrivedate| depardate|visa|age|airline|fltno|visatype|immigration_id|
+----+-----+-----------+----+----+-----+----------+----------+----+---+-------+-----+--------+--------------+
|2016|    4|        101| WAS|   1|   MI|2016-04-01|2016-08-25|   2| 55|     OS|   93|      B2|             0|
|2016|    4|        101| BOS|   1|   MA|2016-04-01|2016-04-05|   1| 58|     LH|00422|      B1|             1|
|2016|    4|        101| ATL|   1|   MA|2016-04-01|2016-04-05|   1| 56|     LH|00422|      B1|             2|
|2016|    4|        101| ATL|   1|   MA|2016-04-01|2016-04-17|   2| 62|     AZ|00614|      B2|             3|
|2016|    4|        101| ATL|   1|   NJ|2016-04-01|2016-05-04|   2| 49|     OS|00089|      B2|             4|
|2016|    4|        101| ATL|   1|   NY|2016-04-01|2016-06-06|   2| 43|     OS|00089|      B2|             5|
|2016|    

In [166]:
immigration_table.write.partitionBy("month", "state").parquet("data/tables/immigration", 'overwrite')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

loop every table and verify that there is data in the table after the pipeline

In [None]:
for mytable in ["country", "port", "immigration_valid", "visa", "mode"]:
            mode_code_table = spark.sql("SELECT * FROM "+ mytable)
            count =   mode_code_table.count()                            
            if count < 1:
                raise ValueError(f"Data quality check failed. {mytable} returned no results")
            print(f"Data quality on table {mytable} check passed")

Data quality on table country check passed
Data quality on table port check passed
Data quality on table immigration_valid check passed


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

immigration table:

    year: 4 digit year for entering US 
    
    month: digitial month for entering US 

    citizenship: immigrant's original citizenship

    port: port of entry 

    arrivedate: immigrant's arrive date 

    depardate: immigrant's depart date 

    age: immigrant's age

    visa: immigrant's visa category

    airline: immigrant's airline 

    fltno: immigrant's flight number 

    visatype: immigrant's visa type such as B1, B2





date table:

    date: the date
    
    day: which day of the year
    
    week: which week of the year
    
    month: which month of the year
    
    year: year number
    
    weekday: is weekday or not

states table:

    code: State Code 

    Median_Age: Average of Median Age

    Total_Population: Total Population in the state

visa table:

    code: visa category code
    
    visa: visa category description

mode table:

    code: mode code
    
    mode: enter mode e.g. air or land

port table:

    code: port code
    
    port: port name

country table:

    code: country code
    
    country: country name

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1.Spark is meant for big data sets that cannot fit on one computer. AS the data increase, we need one mechamism to deal with the big task that one computer could not handle. As the immigration data is so large and it also keeps increasing to time, so that I think spark is one good choice for that.
2.As the data accuracy requirement is not too high, I think the data update frequency should be monthly. If daily, that will be a bit too frequent.
3.If the data size was increased 100 times, I think I will still choose spark. But when save the immigration parquet, I will choose different ways to partition it. If the data needs to populate in dashboard on daily basis, I will implement airflow to control the schedule and task dependencies. If the database needs to be accesses by 100+ people, the postgresql will be one good choice.