# US Immigration Data Warehouse
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
from pyspark.sql.functions import *
import pandas as pd
from pyspark.sql import SparkSession, SQLContext, GroupedData
from pyspark.sql.types import *

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

##### Ad. Scope
The project I have created is to expand U.S. immigration data with additional dimensions. This will allow for wider possibilities when analyzing these data by analysts. In the project I used a star schema with one fact table. For cleaning data, manipulating and creating structure of data warehouse I used Spark, because this technology allow with ease to operations on large data sets. One more thing follows from the use of this technology - data warehouse is saved in parquet file. The use case for this analytical database is to look at this data through the prism of different dimensions and their connections - for example we can find out if there is connection between immigration and demographic or temperature data. It meanse that this data warehouse will allow us to see things like correlation between big aglomerations and immigration, or between warmer places and immigration.

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

##### Ad. Describe and Gather Data 
immigration (for fact table) data comes from the US National Tourism and Trade Office and it is obtained in SAS7BDAT. The temperature data comes from Kaggle and includes temperatures in cities and countries around the world. Next we have Demographic data that comes from OpenSoft. We can find there infomration like male population, female population, median age, race etc. 
There is also dataset with airport codes. There are also some data coming from I94_SAS_Labels_Descriptions.SAS  - several dimension tables were created from information from this file (for example visa infomration, countries, states or modes (sea/land/air)

Description of the most important data sets:

immigration - Report contains international visitor arrival statistics by world regions and select countries (including top 20), type of visa, mode of transportation, age groups, states visited (first intended address only), and the top ports of entry (for select countries). Data sources include: Overseas DHS/CBP I-94 Program data; Canadian visitation data (Stats Canada) and Mexican visitation data (Banco de Mexico).

demographic - This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey.

temperature - This dataset came from Kaggle and contains information about temperature. More info in: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

airports - This is a simple table of airport codes and corresponding cities.


# Important!
#### We can simply run whole ETL Process in console with following command:
`python etl.py`
##### The same process, but broken down into separate steps, is here in Jupyter Notebook
##### The process launched in the console also points to the most important steps
##### We can see preview of this below

![log_ETL_process](static_files/log_capstone.PNG)

##### In the first step, we import all the data that is immediately available from the files 

In [2]:
# All necessary path to the files are listed here

sas_label_descriptions_path = "csv_data/I94_SAS_Labels_Descriptions.SAS"
immigration_path = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
temperature_path = '../../data2/GlobalLandTemperaturesByCity.csv'
demographics_path = 'csv_data/us-cities-demographics.csv'
airports_path = 'csv_data/airport-codes_csv.csv'



In [3]:
spark = SparkSession.builder.config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11").enableHiveSupport().getOrCreate()

In [4]:
immigration_spark = spark.read.format('com.github.saurfang.sas.spark').load(immigration_path)
demographics_spark = spark.read.format("csv").option("header", "true").option("delimiter", ';').load(demographics_path)
temperature_spark = spark.read.format("csv").option("header", "true").option("delimiter", ',').load(temperature_path)
airports_spark = spark.read.format("csv").option("header", "true").option("delimiter", ',').load(airports_path)

##### Let's show raw imported data 

In [9]:
immigration_spark.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [10]:
demographics_spark.show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

In [11]:
temperature_spark.show()

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 14.050999999999998|        

In [13]:
airports_spark.show()

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

### Step 2: Explore and Assess the Data

##### Firstly we will create dimension table with countries and corresponding country codes. Below code will extract this information from I94_SAS_Labels_Descriptions.SAS file. There are a lot of invalid codes, so we will takie care of that and take only valid ones. We also are creating list with only valid country codes - we will need those codes later. Below code is creating spark data frame with appropriate structure for data warehouse and also list with valid country codes.

In [14]:
schema = StructType([
StructField("country_code", StringType(), False),
StructField("country_name", StringType(), False)])

with open(sas_label_descriptions_path) as f:
    lines = f.readlines()

valid_country_codes_raw = []
valid_country_codes_list = []

for index, line in enumerate(lines[9:245]):
    valid_country_codes_list.append(int(line.split('=')[0].strip()))
    valid_country_codes_raw.append({
        'country_code' : int(line.split('=')[0].strip()),
        'country_name' : line.split('=')[1].strip().replace('\n','').replace("'","")
    })
valid_country_codes_raw[0]['country_name'] = "MEXICO"

countries_dimension = spark.createDataFrame(pd.DataFrame(valid_country_codes_raw), schema=schema)

In [15]:
countries_dimension.show()

+------------+---------------+
|country_code|   country_name|
+------------+---------------+
|         582|         MEXICO|
|         236|    AFGHANISTAN|
|         101|        ALBANIA|
|         316|        ALGERIA|
|         102|        ANDORRA|
|         324|         ANGOLA|
|         529|       ANGUILLA|
|         518|ANTIGUA-BARBUDA|
|         687|     ARGENTINA |
|         151|        ARMENIA|
|         532|          ARUBA|
|         438|      AUSTRALIA|
|         103|        AUSTRIA|
|         152|     AZERBAIJAN|
|         512|        BAHAMAS|
|         298|        BAHRAIN|
|         274|     BANGLADESH|
|         513|       BARBADOS|
|         104|        BELGIUM|
|         581|         BELIZE|
+------------+---------------+
only showing top 20 rows



##### Secondly we will create dimension table with port codes and corresponding city/area names. Below code will extract this information from I94_SAS_Labels_Descriptions.SAS file. There are a lot of invalid codes, so we will takie care of that and take only valid ones. We also are creating list with only valid port codes - we will need those codes later. Below code is creating spark data frame with appropriate structure for data warehouse and also list with valid port codes.

In [16]:
schema = StructType([
StructField("port_code", StringType(), False),
StructField("port_city", StringType(), False)])

with open(sas_label_descriptions_path) as f:
    content = f.readlines()
content = [x.strip() for x in content]
ports = content[302:962]
splitted_ports = [port.split("=") for port in ports]
port_codes = [x[0].replace("'","").strip() for x in splitted_ports]
port_locations = [x[1].replace("'","").strip() for x in splitted_ports]
port_cities = [x.split(",")[0] for x in port_locations]
port_states = [x.split(",")[-1] for x in port_locations]
valid_port_codes_raw = pd.DataFrame({"port_code" : port_codes, "port_city": port_cities, "port_state": port_states})
valid_port_codes_raw = valid_port_codes_raw[~valid_port_codes_raw['port_city'].str.contains('No PORT Code')]
valid_port_codes_raw = valid_port_codes_raw[~valid_port_codes_raw['port_city'].str.contains('Collapsed')]
irregular_ports_df = valid_port_codes_raw[valid_port_codes_raw["port_city"] == valid_port_codes_raw["port_state"]]
valid_port_codes_raw = valid_port_codes_raw[~valid_port_codes_raw['port_code'].isin(irregular_ports_df.port_code.tolist())]
valid_port_codes_list = pd.DataFrame(valid_port_codes_raw)['port_code'].tolist()

ports_dimension = spark.createDataFrame(pd.DataFrame(valid_port_codes_raw)[['port_code', 'port_city']], schema=schema)

In [17]:
ports_dimension.show()

+---------+--------------------+
|port_code|           port_city|
+---------+--------------------+
|      ALC|               ALCAN|
|      ANC|           ANCHORAGE|
|      BAR|BAKER AAF - BAKER...|
|      DAC|       DALTONS CACHE|
|      PIZ|DEW STATION PT LA...|
|      DTH|        DUTCH HARBOR|
|      EGL|               EAGLE|
|      FRB|           FAIRBANKS|
|      HOM|               HOMER|
|      HYD|               HYDER|
|      JUN|              JUNEAU|
|      5KE|           KETCHIKAN|
|      KET|           KETCHIKAN|
|      MOS|MOSES POINT INTER...|
|      NIK|             NIKISKI|
|      NOM|                 NOM|
|      PKC|         POKER CREEK|
|      ORI|      PORT LIONS SPB|
|      SKA|             SKAGWAY|
|      SNP|     ST. PAUL ISLAND|
+---------+--------------------+
only showing top 20 rows



##### Thirdly we will create dimension table with mode codes and corresponding mode names. By mode name I mean "mode of movement"  so air/sea/land/not reported.  Below code will extract this information from I94_SAS_Labels_Descriptions.SAS file.We also are creating list with only valid mode codes. Below code is creating spark data frame with appropriate structure for data warehouse and also list with valid mode codes.

In [18]:
schema = StructType([
StructField("mode_id", IntegerType(), False),
StructField("mode_name", StringType(), False)])

with open(sas_label_descriptions_path) as f:
    lines = f.readlines()

valid_mode_codes_raw = []
valid_mode_codes_list = []

for index, line in enumerate(lines[972:976]):
    #print(index, line.split('='))
    valid_mode_codes_list.append(int(line.split('=')[0].strip()))
    valid_mode_codes_raw.append({
        'mode_id':int(line.split('=')[0].strip()),
        'mode_name':line.split('=')[1].replace(';','').replace('\n','').replace("'","").strip()
    })

modes_dimension = spark.createDataFrame(pd.DataFrame(valid_mode_codes_raw)[['mode_id','mode_name']], schema=schema)

In [19]:
modes_dimension.show()

+-------+------------+
|mode_id|   mode_name|
+-------+------------+
|      1|         Air|
|      2|         Sea|
|      3|        Land|
|      9|Not reported|
+-------+------------+



##### Fourthly we will create dimension table with state codes and corresponding state names. Below code will extract this information from I94_SAS_Labels_Descriptions.SAS file. We also are creating list with only valid state codes - we will need those codes later. Below code is creating spark data frame with appropriate structure for data warehouse and also list with valid state codes.

In [20]:
schema = StructType([
StructField("state_code", StringType(), False),
StructField("state_name", StringType(), False)])

with open(sas_label_descriptions_path) as f:
    lines = f.readlines()

valid_state_codes_raw = []
valid_state_codes_list = []

for index, line in enumerate(lines[981:1036]):
    #print(index, line.split('='))
    valid_state_codes_list.append(line.split('=')[0].strip().replace('\t','').replace("'",""))
    valid_state_codes_raw.append({
        'state_code': line.split('=')[0].strip().replace('\t','').replace("'","") ,
        'state_name': line.split('=')[1].replace('\n','').replace("'","").strip()
    }) 

states_dimension =  spark.createDataFrame(pd.DataFrame(valid_state_codes_raw), schema=schema)

In [21]:
states_dimension.show()

+----------+-----------------+
|state_code|       state_name|
+----------+-----------------+
|        AL|          ALABAMA|
|        AK|           ALASKA|
|        AZ|          ARIZONA|
|        AR|         ARKANSAS|
|        CA|       CALIFORNIA|
|        CO|         COLORADO|
|        CT|      CONNECTICUT|
|        DE|         DELAWARE|
|        DC|DIST. OF COLUMBIA|
|        FL|          FLORIDA|
|        GA|          GEORGIA|
|        GU|             GUAM|
|        HI|           HAWAII|
|        ID|            IDAHO|
|        IL|         ILLINOIS|
|        IN|          INDIANA|
|        IA|             IOWA|
|        KS|           KANSAS|
|        KY|         KENTUCKY|
|        LA|        LOUISIANA|
+----------+-----------------+
only showing top 20 rows



##### In this step will create dimension table with visa codes and corresponding visa names. By visa name I mean business/pleasuse or student.  Below code will extract this information from I94_SAS_Labels_Descriptions.SAS file. We also are creating list with only valid visa codes. Below code is creating spark data frame with appropriate structure for data warehouse and also list with valid visa codes.

In [22]:
schema = StructType([
StructField("visa_id", IntegerType(), False),
StructField("visa_type", StringType(), False)])

with open(sas_label_descriptions_path) as f:
    lines = f.readlines()

valid_visa_codes_raw = []
valid_visa_codes_list = []

for index, line in enumerate(lines[1046:1049]):
    #print(index, line.split('='))
    valid_visa_codes_list.append(int(line.split('=')[0].strip()))
    valid_visa_codes_raw.append({
        'visa_id': int(line.split('=')[0].strip()),
        'visa_type': line.split('=')[1].strip().replace('\n','').replace("'","") 
    })

visa_dimension = spark.createDataFrame(pd.DataFrame(valid_visa_codes_raw), schema=schema)

In [23]:
visa_dimension.show()

+-------+---------+
|visa_id|visa_type|
+-------+---------+
|      1| Business|
|      2| Pleasure|
|      3|  Student|
+-------+---------+



##### In this step, we will create our main fact table for immigration. We take into account only the necessary columns, renaming them into appropriate names and setting appropriate data dtypes. We also create a column for the day of immigration, month of immigration and year of immigration (so the arrival). This will be needed because we will partition our table by these columns. At the very end, we also make sure that there are no missing data. We do this by checking whether the data from the appropriate columns is included in the previously prepared lists. 

In [24]:
immigration_spark = immigration_spark.withColumnRenamed("i94addr", "code_state") \
.withColumnRenamed("i94port", "code_port") \
.withColumn("code_visa", col("i94visa").cast("integer")).drop("i94visa") \
.withColumn("code_mode", col("i94mode").cast("integer")).drop("i94mode") \
.withColumn("code_country_origin", col("i94res").cast("integer")).drop("i94res") \
.withColumn("code_country_city", col("i94cit").cast("integer")).drop("i94cit") \
.withColumn("year", col("i94yr").cast("integer")).drop("i94yr") \
.withColumn("month", col("i94mon").cast("integer")).drop("i94mon") \
.withColumn("birth_year", col("biryear").cast("integer")).drop("biryear") \
.withColumn("age", col("i94bir").cast("integer")).drop("i94bir") \
.withColumn("counter_summary", col("count").cast("integer")).drop("count") \
.withColumn("date_base_SAS", to_date(lit("01/01/1960"), "MM/dd/yyyy")) \
.withColumn("arrival_date", expr("date_add(date_base_SAS, arrdate)")) \
.withColumn("departure_date", expr("date_add(date_base_SAS, depdate)")).drop("date_base_SAS", "arrdate", "depdate")\
.withColumn("arrival_date-split", split(col("arrival_date"), "-")) \
.withColumn("arrival_year", col("arrival_date-split")[0].cast("integer")) \
.withColumn("arrival_month", col("arrival_date-split")[1].cast("integer")) \
.withColumn("arrival_day", col("arrival_date-split")[2].cast("integer")) \
.drop("arrival_date-split").drop('dtadfile')


immigration_fact = immigration_spark.filter(immigration_spark['code_country_city'].isin(valid_country_codes_list))\
.filter(immigration_spark['code_country_origin'].isin(valid_country_codes_list))\
.filter(immigration_spark['code_port'].isin(valid_port_codes_list))\
.filter(immigration_spark['code_mode'].isin(valid_mode_codes_list))\
.filter(immigration_spark['code_visa'].isin(valid_visa_codes_list))

In [26]:
immigration_fact.show()

+-----+---------+----------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+---------+---------+-------------------+-----------------+----+-----+----------+---+---------------+------------+--------------+------------+-------------+-----------+
|cicid|code_port|code_state|visapost|occup|entdepa|entdepd|entdepu|matflag| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|code_visa|code_mode|code_country_origin|code_country_city|year|month|birth_year|age|counter_summary|arrival_date|departure_date|arrival_year|arrival_month|arrival_day|
+-----+---------+----------+--------+-----+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+---------+---------+-------------------+-----------------+----+-----+----------+---+---------------+------------+--------------+------------+-------------+-----------+
| 16.0|      NYC|        MA|    null| null|      O|      O|   null|      M|09302016|  nu

##### In this step, we will create dimension table with demographics data. We take into account only the necessary columns, renaming them into appropriate names and setting appropriate data dtypes. With demographics data we are also grouping the data by race. 

In [27]:
demographics_spark = demographics_spark.filter(demographics_spark["State Code"].isin(valid_state_codes_list))
demographics_dimension = demographics_spark.withColumnRenamed("City", "city") \
.withColumnRenamed("State", "state") \
.withColumnRenamed("Median Age", "median_age") \
.withColumnRenamed("Male Population", "male_population") \
.withColumnRenamed("Female Population", "female_population") \
.withColumnRenamed("Total Population", "total_population") \
.withColumnRenamed("Number of Veterans", "number_of_veterans") \
.withColumnRenamed("Foreign-born", "foreign_born") \
.withColumnRenamed("Average Household Size", "average_household_size") \
.withColumnRenamed("State Code", "state_code") \
.withColumnRenamed("Race", "race") \
.withColumnRenamed("Count", "count") \
.groupBy(col("city"), col("state"), col("median_age"), col("male_population")\
         ,col("female_population"), col("total_population"), col("number_of_veterans")\
         ,col("foreign_born"), col("average_household_size"), col("state_code"))\
.pivot("race").agg(sum("count").cast("integer"))\
.withColumnRenamed("American Indian and Alaska Native", "american_indian_and_alaska_native") \
.withColumnRenamed("Asian", "asian") \
.withColumnRenamed("Black or African-American", "black_or_african_american") \
.withColumnRenamed("Hispanic or Latino", "hispanic_or_atino") \
.withColumnRenamed("White", "white") \
.fillna({"american_indian_and_alaska_native": 0,
         "asian": 0,
         "black_or_african_american": 0,
         "hispanic_or_atino": 0,
         "white": 0})


##### With airports data we make sure that it will only contain validated ports 

In [28]:
airports_dimension = airports_spark.filter(airports_spark["iata_code"].isin(valid_port_codes_list))

In [29]:
airports_dimension.show()

+------+--------------+--------------------+------------+---------+-----------+----------+-------------------+--------+---------+----------+--------------------+
| ident|          type|                name|elevation_ft|continent|iso_country|iso_region|       municipality|gps_code|iata_code|local_code|         coordinates|
+------+--------------+--------------------+------------+---------+-----------+----------+-------------------+--------+---------+----------+--------------------+
|   57A| seaplane_base|Tokeen Seaplane Base|        null|       NA|         US|     US-AK|             Tokeen|     57A|      TKI|       57A|-133.32699585, 55...|
|  89NY| small_airport|     Maxson Airfield|         340|       NA|         US|     US-NY|     Alexandria Bay|    89NY|      AXB|      89NY|-75.90034, 44.312002|
|  AGGF| small_airport|Fera/Maringe Airport|        null|       OC|         SB|     SB-IS|        Fera Island|    AGGF|      FRE|      null| 159.576996, -8.1075|
|   ANZ| small_airport| Angu

##### In the data with temperature, we cannot aggregate them by date, because they do not contain data for 2016. However, we can find out what the average temperature was in each month in the given city so we will create such dimension. We will start from pandas data frame, becasue it is easier to tranform data there. We will perform some operation and finally we transform pandas data frame to our temperature dimension.

In [31]:
schema = StructType([
StructField("port_code", StringType(), False),
StructField("month", IntegerType(), False),
StructField("avg_tempertature", DoubleType(), False)
])

temperature_pandas = pd.read_csv(temperature_path)
temperature_pandas = temperature_pandas[~temperature_pandas['AverageTemperature'].isnull() ]
temperature_pandas['datetime'] = pd.to_datetime(temperature_pandas['dt'], format="%Y/%m/%d")
temperature_pandas['month'] = temperature_pandas['datetime'].dt.month
temperature_pandas = temperature_pandas.groupby(['City','month'])[['AverageTemperature']].mean().reset_index()
temperature_pandas['City']=temperature_pandas['City'].apply(lambda x: x.upper())
temperature_pandas = temperature_pandas.merge(ports_dimension.toPandas(),how='inner', left_on = 'City', right_on='port_city')[['port_code','month','AverageTemperature']]
temperature_pandas = temperature_pandas.rename({'AverageTemperature':'avg_tempertature'}, axis=1)
temperature_pandas['avg_tempertature'] = temperature_pandas['avg_tempertature'].round(decimals=2)
temperature_dimension = spark.createDataFrame(temperature_pandas, schema)

In [32]:
temperature_dimension.show()

+---------+-----+----------------+
|port_code|month|avg_tempertature|
+---------+-----+----------------+
|      ABE|    1|            4.53|
|      ABE|    2|            4.33|
|      ABE|    3|            4.67|
|      ABE|    4|            5.94|
|      ABE|    5|            8.21|
|      ABE|    6|           10.97|
|      ABE|    7|           13.06|
|      ABE|    8|           13.37|
|      ABE|    9|           11.91|
|      ABE|   10|            9.46|
|      ABE|   11|            7.31|
|      ABE|   12|            5.69|
|      AKR|    1|           -3.21|
|      CAK|    1|           -3.21|
|      AKR|    2|           -2.12|
|      CAK|    2|           -2.12|
|      AKR|    3|            2.56|
|      CAK|    3|            2.56|
|      AKR|    4|            8.87|
|      CAK|    4|            8.87|
+---------+-----+----------------+
only showing top 20 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

I have implemented star schema. It is the typical schema for a Data Warehouse and together with the snowflake model they are the most popular data warehouse schemas. For cleaning data, creating structure of data warehouse and all operations with data frames and files I used Spark. Data Warehouse schema is saved in parquet file. For this project I used Spark because this technology allows you to easily manipulate large files. Despite the fact that the data for this task was not that large yet, I decided that the data size was sufficient to use a Spark. 

Here is a scheme for my model:
![schema_postgres_database](static_files/scheme_capstone.png)



#### 3.2 Mapping Out Data Pipelines

Firstly we need to load whole data and transform them to spark data frames:

  - load raw data: immigration, temperature, demographics and airports
  
  
Secondly:
  - Clean all the data
  - Prepare fact table and all the dimension tables
  
  
Thirdly:
  - For integrity and consistency we will insert in fact table only items with dimension keys right.
  - Save all the tables into parquet files


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Saving our fact table into parquet

In [33]:
immigration_fact = immigration_fact.join(demographics_dimension, immigration_fact["code_state"] == demographics_dimension["state_code"], "left_semi") \
.join(airports_dimension, immigration_fact["code_port"] == airports_dimension["iata_code"], "left_semi") \
.join(ports_dimension, immigration_fact["code_port"] == ports_dimension["port_code"], "left_semi") \
.join(temperature_dimension, (immigration_fact["code_port"] == temperature_dimension["port_code"]) & (immigration_fact["arrival_month"] == temperature_dimension["month"]), "left_semi") \
.join(countries_dimension, immigration_fact["code_country_origin"] == countries_dimension["country_code"], "left_semi") \
.join(visa_dimension, immigration_fact["code_visa"] == visa_dimension["visa_id"], "left_semi") \
.join(modes_dimension, immigration_fact["code_mode"] == modes_dimension["mode_id"], "left_semi") \
.join(states_dimension, immigration_fact["code_state"] == states_dimension["state_code"], "left_semi")

immigration_fact.write.mode('overwrite').partitionBy("arrival_year", "arrival_month", "arrival_day").parquet("./DWH_IMMIGRATION_PARQUET/immigration_fact.parquet")

##### Saving our dimension tables into parquet

In [34]:
countries_dimension.write.mode('overwrite').parquet("./DWH_IMMIGRATION_PARQUET/countries_dimension.parquet")
ports_dimension.write.mode('overwrite').parquet("./DWH_IMMIGRATION_PARQUET/ports_dimension.parquet")
modes_dimension.write.mode('overwrite').parquet("./DWH_IMMIGRATION_PARQUET/modes_dimension.parquet")
states_dimension.write.mode('overwrite').parquet("./DWH_IMMIGRATION_PARQUET/states_dimension.parquet")
visa_dimension.write.mode('overwrite').parquet("./DWH_IMMIGRATION_PARQUET/visa_dimension.parquet")
demographics_dimension.write.mode('overwrite').parquet("./DWH_IMMIGRATION_PARQUET/demographics_dimension.parquet")
airports_dimension.write.mode('overwrite').parquet("./DWH_IMMIGRATION_PARQUET/airports_dimension.parquet")
temperature_dimension.write.mode('overwrite').parquet("./DWH_IMMIGRATION_PARQUET/temperature_dimension.parquet")


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [35]:
def quality_check_rows(spark, path_parquet, table_name):
    """
    Function for first quality check.
    It validates if given table has any rows -
    If table has any rows it means that ETL process worked fine
    and the rows are in the table - Then the check is passed.
    """   
    
    df = spark.read.parquet(path_parquet)
    rows = df.count()
    
    if rows == 0:
        print(f" - Table {table_name} has 0 rows - quality check failed")
    else:
        print(f" - Table {table_name} has {rows} rows - quality check passed")
    return None


def quality_check_joins(spark, path_fact, path_dim_to_check, fact_col, dim_col):
    """
    Function for second quality check.
    It validates if join between fact and dimension table is working correctly -
    for verification -left_anti- is used here.
    """    
    
    df = spark.read.parquet(path_fact)
    df_to_check = spark.read.parquet(path_dim_to_check)
    
    common_values = df.select(col(fact_col)).distinct() \
                             .join(df_to_check, df[fact_col] == df_to_check[dim_col], "left_anti") \
                             .count()
    
    if common_values == 0:
        print(f" - Quality check passed - join between those two tables works correctly")
    else:
        print(f" - Quality check failed - join between those two tables works incorrectly")
    return None

##### 1) Count check to ensure completeness of country dimension

In [36]:
quality_check_rows(spark, "./DWH_IMMIGRATION_PARQUET/countries_dimension.parquet", 'country_dimension')

 - Table country_dimension has 236 rows - quality check passed


##### 2) Count check to ensure completeness of demographic dimension

In [37]:
quality_check_rows(spark, "./DWH_IMMIGRATION_PARQUET/demographics_dimension.parquet", 'demographic_dimension')

 - Table demographic_dimension has 596 rows - quality check passed


##### 3) Count check to ensure completeness of immigration fact

In [38]:
quality_check_rows(spark, "./DWH_IMMIGRATION_PARQUET/immigration_fact.parquet", 'immigration_fact')

 - Table immigration_fact has 1387262 rows - quality check passed


##### 4) Integrity check to ensure if join works properly between immigration fact table and demographic dimension table

In [39]:
quality_check_joins(spark, path_fact="./DWH_IMMIGRATION_PARQUET/immigration_fact.parquet", path_dim_to_check="./DWH_IMMIGRATION_PARQUET/demographics_dimension.parquet", fact_col='code_state', dim_col='state_code')

 - Quality check passed - join between those two tables works correctly


##### 5) Integrity check to ensure if join works properly between immigration fact table and state dimension table

In [40]:
quality_check_joins(spark, path_fact="./DWH_IMMIGRATION_PARQUET/immigration_fact.parquet", path_dim_to_check="./DWH_IMMIGRATION_PARQUET/states_dimension.parquet", fact_col='code_state', dim_col='state_code')

 - Quality check passed - join between those two tables works correctly


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Fact tables:
- ___immigration___ - records with immigration data; data comes from I94_SAS_Labels_Descriptions.SAS file after cleaning
    - cicid bigint
    - code_port string 
    - code_state string 
    - visapost string 
    - occup string 
    - entdepa string 
    - entdepd string 
    - entdepu string 
    - matflag string 
    - dtaddto string 
    - gender string 
    - insnum string 
    - airline string 
    - admnum double 
    - fltno string 
    - visatype string 
    - code_visa integer 
    - code_mode integer 
    - code_country_origin integer 
    - code_country_city integer 
    - year integer 
    - month integer 
    - birth_year integer 
    - age integer 
    - counter_summary integer 
    - arrival_date date 
    - departure_date date 
    - arrival_year integer 
    - arrival_month integer 
    - arrival_day integer
    
 
#### Dimension tables:
- ___demographic___ - demographic data; data comes from us-cities-demographics.csv file after cleaning
    - state_code string
    - city string
    - state string
    - median_age string
    - male_population string
    - female_population string
    - total_population string
    - number_of_veterans string
    - foreign_born string
    - average_household_size string
    - american_indian_and_alaska_native integer
    - asian integer
    - black_or_african_american integer
    - hispanic_or_atino integer
    - white integer
    
   
- ___temperature___ - temperature data; data comes from GlobalLandTemperaturesByCity.csv file after cleaning
    - port_code string
    - month integer
    - avg_tempertature double 
    
    
- ___airport___ - airports data; data comes from airport-codes_csv.csv file after cleaning
    - ident string
    - type string
    - name string
    - elevation_ft string
    - continent string
    - iso_country string
    - iso_region string
    - municipality string
    - gps_code string
    - iata_code string
    - local_code string
    - coordinates string
    
    
- ___state___ - state codes data; data comes from I94_SAS_Labels_Descriptions.SAS file after cleaning
    - state_code string
    - state_name string
    
    
- ___country___ - countries data; data comes from I94_SAS_Labels_Descriptions.SAS file after cleaning
    - country_code string
    - country_name string
    
    
- ___visa___ - visa codes data; data comes from I94_SAS_Labels_Descriptions.SAS file after cleaning
    - visa_id integer
    - visa_type string
    
    
- ___mode___ - modes data; data comes from I94_SAS_Labels_Descriptions.SAS file after cleaning
    - mode_id integer
    - mode_name string
    
    
- ___port___ - ports data; data comes from I94_SAS_Labels_Descriptions.SAS file after cleaning
    - port_code string
    - port_name string

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.


##### 1) I have used Apache Spark to do all the cleaning, manipulating data and creating database model. I have chosen this tool because it can handle large amounts of data easily and the Apache Spark project itself is very widely used so there is a lot of information about Apache Spark in the web (Apache Spark also has very good documentation). The data model files are in parquet files so the scalibility of this project is very high.

##### 2) As the partition is daily - we should update ourt data every day.


##### a) Apache Spark can still be used even if the data will be increased 100x.

##### b) My project requires a daily data update. I would use Apache Airflow for this.

##### c) If a hundred people would need access to the data I think Apache Hive would be a good choice. It has a low entry barrier and you only need SQL knowledge to navigate well in Hive.  