# Data Pipeline for creating data model to facilitate analysis of relation between termperatures around the cities of USA to the number and types of visitors of USA
### Data Engineering Capstone Project

I prepared data pipeline to create tables that enable analysis of number and types of visitors to various cities of USA around the year and how does it relate to the historical average temperatures for cities in USA around the year.

In [1]:
import pandas as pd
from pyspark.sql.functions import udf
from datetime import datetime as dt
from pyspark.sql.types import DateType, FloatType, LongType, DoubleType, StringType, IntegerType
import re
from pyspark.sql import functions as F

#### Scope 
I used data about visitors at different airports of USA along with historical records of temperature around year for cities in USA. I also used data about airport locations and demographics of cities across USA. I created data pipeline that extracted data from the downloaded datasets, performed necessary filtering and transformations, loaded it into different tables as per data model and saved in Parquet format with appropriate partitions using Apache Spark.

#### Description of Data
For this purpose, I collected data from following sources:
* Data about the visitors arriving to different US airports in 2016 was retrieved from this site: https://travel.trade.gov/research/reports/i94/historical/2016.html
* Data of average daily temperature for cities around the world from 1750 to 2013 was availed from this source: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
* Data about airport codes and their location was retrieved from this site: https://datahub.io/core/airport-codes
* Data about demographics of various cities based on U.S. Census data of 2015 from this source: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/

In [2]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [3]:
immigration_data = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
immigration_data.show(5)

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|  6.0|2016.0|   4.0| 692.0| 692.0|    XXX|20573.0|   null|   null|   null|  37.0|    2.0|  1.0|    null|    null| null|      T|   null|      U|   null| 1979.0|10282016|  null|  null|   null| 1.897628485E9| null|      B2|
|  7.0|2016.0|   4.0| 254.0| 276.0|    ATL|20551.0|    1.0|     AL|   null|  25.0|    3.0|  1.0|20130811|     SE

In [4]:
temp_data = spark.read.format("csv").option("header", True).load("../../data2/GlobalLandTemperaturesByCity.csv")
temp_data.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



In [5]:
ap_data = spark.read.format("csv").option("header", True).load("airport-codes_csv.csv")
ap_data.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

In [6]:
dem_data = spark.read.format("csv").option("header", True).option("delimiter", ";").load("us-cities-demographics.csv")
dem_data.show(5)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|   Silver Spring|     Maryland|      33.8|          40601|            41862|           82463|              1562|       30908|                   2.6|        MD|  Hispanic or Latino|25924|
|          Quincy|Massachusetts|      41.0|          44129|            49500|           93629|              4147|       32935|                  2.39|        MA|               White|58723|
|          Hoover|      Alabama|      38.5|          38040| 

### Assessment of all the datasets

#### Assessing Airports data

In [7]:
ap_data = spark.read.format("csv").option("header", True).load("airport-codes_csv.csv")
ap_data.show(5)

+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|ident|         type|                name|elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|         coordinates|
+-----+-------------+--------------------+------------+---------+-----------+----------+------------+--------+---------+----------+--------------------+
|  00A|     heliport|   Total Rf Heliport|          11|       NA|         US|     US-PA|    Bensalem|     00A|     null|       00A|-74.9336013793945...|
| 00AA|small_airport|Aero B Ranch Airport|        3435|       NA|         US|     US-KS|       Leoti|    00AA|     null|      00AA|-101.473911, 38.7...|
| 00AK|small_airport|        Lowell Field|         450|       NA|         US|     US-AK|Anchor Point|    00AK|     null|      00AK|-151.695999146, 5...|
| 00AL|small_airport|        Epps Airpark|         820|       NA|         US|     

The airports data includes details about all the airports around the world including those that do not have IATA code assigned to them. Based on the scope of the project, I decided to process only those airports for which IATA code is present in the row and are located in United States.

In [8]:
us_ap_data = ap_data.filter(ap_data["iso_country"] == "US").filter(ap_data["iata_code"] != "null").selectExpr("iata_code", "municipality")
us_ap_data.show(5)

+---------+-------------+
|iata_code| municipality|
+---------+-------------+
|      OCA|    Key Largo|
|      PQS|Pilot Station|
|      CSE|Crested Butte|
|      JCY| Johnson City|
|      PMX|       Palmer|
+---------+-------------+
only showing top 5 rows



#### Assessing temperatures data

In [9]:
temp_data = spark.read.format("csv").option("header", True).load("../../data2/GlobalLandTemperaturesByCity.csv")
temp_data.show(5)

+----------+------------------+-----------------------------+-----+-------+--------+---------+
|        dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+----------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



As I noticed that some rows have `null` values for AverageTemperature and AverageTemperatureUncertainty. As the file original file is in CSV format, all the columns are represented as `String` in the dataframe.
* `dt` column contains date in string fomat, that needs to be converted to DateType
* `AverageTemperature` and `AverageTemperatureUncertainty` need to be converted from string to FloatType
Here is how this data can be processed during transformation stage of pipeline:

In [10]:
def get_datetime(date_string):
    return dt.strptime(date_string, '%Y-%m-%d')
udf_datetime = udf(lambda date_string: get_datetime(date_string), DateType())
# Filtering out rows with null value in AverageTemperature
# Adding date column of DateType
# Converting AverageTemperature and AverageTemperatureUncertainty into Float format
clean_temp_data = temp_data.where(temp_data["AverageTemperature"].isNotNull()) \
                            .withColumn("date", udf_datetime(temp_data.dt)) \
                            .selectExpr(["City as city", 
                                         "Country as country", 
                                         "Latitude as latitude", 
                                         "Longitude as longitude", 
                                         "cast(AverageTemperature as float) as avg_temperature",
                                         "cast(AverageTemperatureUncertainty as float) as avg_temperature_uncertainty"])
clean_temp_data.show(5)
clean_temp_data.printSchema()

+-----+-------+--------+---------+---------------+---------------------------+
| city|country|latitude|longitude|avg_temperature|avg_temperature_uncertainty|
+-----+-------+--------+---------+---------------+---------------------------+
|Århus|Denmark|  57.05N|   10.33E|          6.068|                      1.737|
|Århus|Denmark|  57.05N|   10.33E|          5.788|                      3.624|
|Århus|Denmark|  57.05N|   10.33E|         10.644|                      1.283|
|Århus|Denmark|  57.05N|   10.33E|         14.051|                      1.347|
|Århus|Denmark|  57.05N|   10.33E|         16.082|                      1.396|
+-----+-------+--------+---------+---------------+---------------------------+
only showing top 5 rows

root
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- avg_temperature_uncertainty: float (nullable = true)

#### Assessing US demographics data

In [11]:
dem_data.printSchema()
dem_data.show(5)

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: string (nullable = true)
 |-- Male Population: string (nullable = true)
 |-- Female Population: string (nullable = true)
 |-- Total Population: string (nullable = true)
 |-- Number of Veterans: string (nullable = true)
 |-- Foreign-born: string (nullable = true)
 |-- Average Household Size: string (nullable = true)
 |-- State Code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: string (nullable = true)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+-----+
|            City|        State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race|Count|
+----------------+-------------+----------+---------------+-----------------+----------------+--

The demographics data contains various numeric fields like population counts of different genders as well as median age represented as `String` format in dataframe.
* Fields `Male Population`, 'Female Population`, `Total Population`, `Foreign-born` needs to be converted to Long data type.
* `Median Age` needs to be converted to Float data type.
* `Average Household Size` can be converted to Integer data type.  

I have shown below, how this data can be processed during transformation stage of pipeline.

In [12]:
valid_dem_data = dem_data.withColumnRenamed("State Code", "state_code") \
                    .withColumnRenamed("Median Age", "median_age") \
                    .withColumnRenamed("Male Population", "male_population") \
                    .withColumnRenamed("Female Population", "female_population") \
                    .withColumnRenamed("Total Population", "total_population") \
                    .withColumnRenamed("Foreign-born", "foreign_born") \
                    .withColumnRenamed("Average Household Size", "avg_household_size") \
                    .selectExpr("City as city", 
                        "State as state", 
                        "state_code",
                        "cast(male_population as bigint) as male_population", 
                        "cast(female_population as bigint) as female_population", 
                        "cast(total_population as bigint) as total_population", 
                        "cast(foreign_born as bigint) as foreign_born",
                        "cast(avg_household_size as int) as avg_household_size", 
                        "cast(median_age as float) as median_age", 
                        "Race as race")
valid_dem_data.printSchema()
valid_dem_data.show(5)

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- avg_household_size: integer (nullable = true)
 |-- median_age: float (nullable = true)
 |-- race: string (nullable = true)

+----------------+-------------+----------+---------------+-----------------+----------------+------------+------------------+----------+--------------------+
|            city|        state|state_code|male_population|female_population|total_population|foreign_born|avg_household_size|median_age|                race|
+----------------+-------------+----------+---------------+-----------------+----------------+------------+------------------+----------+--------------------+
|   Silver Spring|     Maryland|        MD|          40601|            41862|           82463|

#### Assessing immigration data

In [13]:
immigration_data.printSchema()
immigration_data.show(5)

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [14]:
immigration_data.filter(immigration_data["i94port"] == "XXX").count()

3522

There are many rows in the data set with XXX value assigned to `i94port` column. XXX code is not assigned to any airport by IATA. That means that this value is used as placeholder for indicating an airport which does not have IATA code assigned to it.

In [15]:
immigration_data.where(F.isnull(F.col("i94addr"))).count()

152592

There are also many rows that have null value for `i94addr` which indicates two letter abbrevation code for the first state to be visited by the visitor.

In [16]:
immigration_data.where(F.isnull(F.col("visatype"))).count()

0

In [17]:
immigration_data.selectExpr("visatype").distinct().show()

+--------+
|visatype|
+--------+
|      F2|
|     GMB|
|      B2|
|      F1|
|     CPL|
|      I1|
|      WB|
|      M1|
|      B1|
|      WT|
|      M2|
|      CP|
|     GMT|
|      E1|
|       I|
|      E2|
|     SBP|
+--------+



* The `i94cit` indicate the country of residence where the visitor is coming from. In SAS file, it is represented as some numeric code whose mapping to country names is provided in the label description file original data source. The codes can be replaced by the country name and added as new column named `country_of_residence` to dataframe.
* The `arrdate` indiating the date of arrival of visitor is in SAS encoding of date which is essentially number of days after 1st June 1960 on which the targeted date comes. This can be converted to Date type in Spark and added as column `arrival_date` to dataframe.
* `i94yr` indicating arrival year of visitor can be cast to Integer type and column renamed to `arrival_year'.
* `i94mon` indicating arrival year of visitor can be cast to Integer type and column renamed to `arrival_month`.
* `biryear` indicating year of birth of visitor can be cast to Integer type and column renamed to `birth_year`.
* `i94bir` indicating age of visitor can be cast to Integer type and column renamed to `age`.
* `i94port` indicating IATA code of airport where the user arrived can be renamed to `iata_code`.

The above cleaning and transformation can be done using code shown below during transformation step of pipeline.

In [18]:
def get_date(sas_numeric_date):
    date = pd.to_timedelta(sas_numeric_date, unit='D') + pd.Timestamp('1960-1-1')
    return date
udf_date = udf(lambda x : get_date(x), DateType())
code_to_country_string = """
 582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'
   236 =  'AFGHANISTAN'
   101 =  'ALBANIA'
   316 =  'ALGERIA'
   102 =  'ANDORRA'
   324 =  'ANGOLA'
   529 =  'ANGUILLA'
   518 =  'ANTIGUA-BARBUDA'
   687 =  'ARGENTINA '
   151 =  'ARMENIA'
   532 =  'ARUBA'
   438 =  'AUSTRALIA'
   103 =  'AUSTRIA'
   152 =  'AZERBAIJAN'
   512 =  'BAHAMAS'
   298 =  'BAHRAIN'
   274 =  'BANGLADESH'
   513 =  'BARBADOS'
   104 =  'BELGIUM'
   581 =  'BELIZE'
   386 =  'BENIN'
   509 =  'BERMUDA'
   153 =  'BELARUS'
   242 =  'BHUTAN'
   688 =  'BOLIVIA'
   717 =  'BONAIRE, ST EUSTATIUS, SABA' 
   164 =  'BOSNIA-HERZEGOVINA'
   336 =  'BOTSWANA'
   689 =  'BRAZIL'
   525 =  'BRITISH VIRGIN ISLANDS'
   217 =  'BRUNEI'
   105 =  'BULGARIA'
   393 =  'BURKINA FASO'
   243 =  'BURMA'
   375 =  'BURUNDI'
   310 =  'CAMEROON'
   326 =  'CAPE VERDE'
   526 =  'CAYMAN ISLANDS'
   383 =  'CENTRAL AFRICAN REPUBLIC'
   384 =  'CHAD'
   690 =  'CHILE'
   245 =  'CHINA, PRC'
   721 =  'CURACAO' 
   270 =  'CHRISTMAS ISLAND'
   271 =  'COCOS ISLANDS'
   691 =  'COLOMBIA'
   317 =  'COMOROS'
   385 =  'CONGO'
   467 =  'COOK ISLANDS'
   575 =  'COSTA RICA'
   165 =  'CROATIA'
   584 =  'CUBA'
   218 =  'CYPRUS'
   140 =  'CZECH REPUBLIC'
   723 =  'FAROE ISLANDS (PART OF DENMARK)'  
   108 =  'DENMARK'
   322 =  'DJIBOUTI'
   519 =  'DOMINICA'
   585 =  'DOMINICAN REPUBLIC'
   240 =  'EAST TIMOR'
   692 =  'ECUADOR'
   368 =  'EGYPT'
   576 =  'EL SALVADOR'
   399 =  'EQUATORIAL GUINEA'
   372 =  'ERITREA'
   109 =  'ESTONIA'
   369 =  'ETHIOPIA'
   604 =  'FALKLAND ISLANDS'
   413 =  'FIJI'
   110 =  'FINLAND'
   111 =  'FRANCE'
   601 =  'FRENCH GUIANA'
   411 =  'FRENCH POLYNESIA'
   387 =  'GABON'
   338 =  'GAMBIA'
   758 =  'GAZA STRIP' 
   154 =  'GEORGIA'
   112 =  'GERMANY'
   339 =  'GHANA'
   143 =  'GIBRALTAR'
   113 =  'GREECE'
   520 =  'GRENADA'
   507 =  'GUADELOUPE'
   577 =  'GUATEMALA'
   382 =  'GUINEA'
   327 =  'GUINEA-BISSAU'
   603 =  'GUYANA'
   586 =  'HAITI'
   726 =  'HEARD AND MCDONALD IS.'
   149 =  'HOLY SEE/VATICAN'
   528 =  'HONDURAS'
   206 =  'HONG KONG'
   114 =  'HUNGARY'
   115 =  'ICELAND'
   213 =  'INDIA'
   759 =  'INDIAN OCEAN AREAS (FRENCH)' 
   729 =  'INDIAN OCEAN TERRITORY' 
   204 =  'INDONESIA'
   249 =  'IRAN'
   250 =  'IRAQ'
   116 =  'IRELAND'
   251 =  'ISRAEL'
   117 =  'ITALY'
   388 =  'IVORY COAST'
   514 =  'JAMAICA'
   209 =  'JAPAN'
   253 =  'JORDAN'
   201 =  'KAMPUCHEA'
   155 =  'KAZAKHSTAN'
   340 =  'KENYA'
   414 =  'KIRIBATI'
   732 =  'KOSOVO' 
   272 =  'KUWAIT'
   156 =  'KYRGYZSTAN'
   203 =  'LAOS'
   118 =  'LATVIA'
   255 =  'LEBANON'
   335 =  'LESOTHO'
   370 =  'LIBERIA'
   381 =  'LIBYA'
   119 =  'LIECHTENSTEIN'
   120 =  'LITHUANIA'
   121 =  'LUXEMBOURG'
   214 =  'MACAU'
   167 =  'MACEDONIA'
   320 =  'MADAGASCAR'
   345 =  'MALAWI'
   273 =  'MALAYSIA'
   220 =  'MALDIVES'
   392 =  'MALI'
   145 =  'MALTA'
   472 =  'MARSHALL ISLANDS'
   511 =  'MARTINIQUE'
   389 =  'MAURITANIA'
   342 =  'MAURITIUS'
   760 =  'MAYOTTE (AFRICA - FRENCH)' 
   473 =  'MICRONESIA, FED. STATES OF'
   157 =  'MOLDOVA'
   122 =  'MONACO'
   299 =  'MONGOLIA'
   735 =  'MONTENEGRO' 
   521 =  'MONTSERRAT'
   332 =  'MOROCCO'
   329 =  'MOZAMBIQUE'
   371 =  'NAMIBIA'
   440 =  'NAURU'
   257 =  'NEPAL'
   123 =  'NETHERLANDS'
   508 =  'NETHERLANDS ANTILLES'
   409 =  'NEW CALEDONIA'
   464 =  'NEW ZEALAND'
   579 =  'NICARAGUA'
   390 =  'NIGER'
   343 =  'NIGERIA'
   470 =  'NIUE'
   275 =  'NORTH KOREA'
   124 =  'NORWAY'
   256 =  'OMAN'
   258 =  'PAKISTAN'
   474 =  'PALAU'
   743 =  'PALESTINE' 
   504 =  'PANAMA'
   441 =  'PAPUA NEW GUINEA'
   693 =  'PARAGUAY'
   694 =  'PERU'
   260 =  'PHILIPPINES'
   416 =  'PITCAIRN ISLANDS'
   107 =  'POLAND'
   126 =  'PORTUGAL'
   297 =  'QATAR'
   748 =  'REPUBLIC OF SOUTH SUDAN'
   321 =  'REUNION'
   127 =  'ROMANIA'
   158 =  'RUSSIA'
   376 =  'RWANDA'
   128 =  'SAN MARINO'
   330 =  'SAO TOME AND PRINCIPE'
   261 =  'SAUDI ARABIA'
   391 =  'SENEGAL'
   142 =  'SERBIA AND MONTENEGRO'
   745 =  'SERBIA' 
   347 =  'SEYCHELLES'
   348 =  'SIERRA LEONE'
   207 =  'SINGAPORE'
   141 =  'SLOVAKIA'
   166 =  'SLOVENIA'
   412 =  'SOLOMON ISLANDS'
   397 =  'SOMALIA'
   373 =  'SOUTH AFRICA'
   276 =  'SOUTH KOREA'
   129 =  'SPAIN'
   244 =  'SRI LANKA'
   346 =  'ST. HELENA'
   522 =  'ST. KITTS-NEVIS'
   523 =  'ST. LUCIA'
   502 =  'ST. PIERRE AND MIQUELON'
   524 =  'ST. VINCENT-GRENADINES'
   716 =  'SAINT BARTHELEMY' 
   736 =  'SAINT MARTIN' 
   749 =  'SAINT MAARTEN' 
   350 =  'SUDAN'
   602 =  'SURINAME'
   351 =  'SWAZILAND'
   130 =  'SWEDEN'
   131 =  'SWITZERLAND'
   262 =  'SYRIA'
   268 =  'TAIWAN'
   159 =  'TAJIKISTAN'
   353 =  'TANZANIA'
   263 =  'THAILAND'
   304 =  'TOGO'
   417 =  'TONGA'
   516 =  'TRINIDAD AND TOBAGO'
   323 =  'TUNISIA'
   264 =  'TURKEY'
   161 =  'TURKMENISTAN'
   527 =  'TURKS AND CAICOS ISLANDS'
   420 =  'TUVALU'
   352 =  'UGANDA'
   162 =  'UKRAINE'
   296 =  'UNITED ARAB EMIRATES'
   135 =  'UNITED KINGDOM'
   695 =  'URUGUAY'
   163 =  'UZBEKISTAN'
   410 =  'VANUATU'
   696 =  'VENEZUELA'
   266 =  'VIETNAM'
   469 =  'WALLIS AND FUTUNA ISLANDS'
   757 =  'WEST INDIES (FRENCH)' 
   333 =  'WESTERN SAHARA'
   465 =  'WESTERN SAMOA'
   216 =  'YEMEN'
   139 =  'YUGOSLAVIA'
   301 =  'ZAIRE'
   344 =  'ZAMBIA'
   315 =  'ZIMBABWE'
   403 =  'INVALID: AMERICAN SAMOA'
   712 =  'INVALID: ANTARCTICA' 
   700 =  'INVALID: BORN ON BOARD SHIP'
   719 =  'INVALID: BOUVET ISLAND (ANTARCTICA/NORWAY TERR.)'
   574 =  'INVALID: CANADA'
   720 =  'INVALID: CANTON AND ENDERBURY ISLS' 
   106 =  'INVALID: CZECHOSLOVAKIA'
   739 =  'INVALID: DRONNING MAUD LAND (ANTARCTICA-NORWAY)' 
   394 =  'INVALID: FRENCH SOUTHERN AND ANTARCTIC'
   501 =  'INVALID: GREENLAND'
   404 =  'INVALID: GUAM'
   730 =  'INVALID: INTERNATIONAL WATERS' 
   731 =  'INVALID: JOHNSON ISLAND' 
   471 =  'INVALID: MARIANA ISLANDS, NORTHERN'
   737 =  'INVALID: MIDWAY ISLANDS' 
   753 =  'INVALID: MINOR OUTLYING ISLANDS - USA'
   740 =  'INVALID: NEUTRAL ZONE (S. ARABIA/IRAQ)' 
   710 =  'INVALID: NON-QUOTA IMMIGRANT'
   505 =  'INVALID: PUERTO RICO'
    0  =  'INVALID: STATELESS'
   705 =  'INVALID: STATELESS'
   583 =  'INVALID: UNITED STATES'
   407 =  'INVALID: UNITED STATES'
   999 =  'INVALID: UNKNOWN'
   239 =  'INVALID: UNKNOWN COUNTRY'
   134 =  'INVALID: USSR'
   506 =  'INVALID: U.S. VIRGIN ISLANDS'
   755 =  'INVALID: WAKE ISLAND'  
   311 =  'Collapsed Tanzania (should not show)'
   741 =  'Collapsed Curacao (should not show)'
    54 =  'No Country Code (54)'
   100 =  'No Country Code (100)'
   187 =  'No Country Code (187)'
   190 =  'No Country Code (190)'
   200 =  'No Country Code (200)'
   219 =  'No Country Code (219)'
   238 =  'No Country Code (238)'
   277 =  'No Country Code (277)'
   293 =  'No Country Code (293)'
   300 =  'No Country Code (300)'
   319 =  'No Country Code (319)'
   365 =  'No Country Code (365)'
   395 =  'No Country Code (395)'
   400 =  'No Country Code (400)'
   485 =  'No Country Code (485)'
   503 =  'No Country Code (503)'
   589 =  'No Country Code (589)'
   592 =  'No Country Code (592)'
   791 =  'No Country Code (791)'
   849 =  'No Country Code (849)'
   914 =  'No Country Code (914)'
   944 =  'No Country Code (944)'
   996 =  'No Country Code (996)'
 """
code_to_country = {}
for line in code_to_country_string.split("\n"):
    line = line.strip()
    match = re.search(r"([\d]+)[=\s]+'([\w.\(\)\s:]+)'", line)    
    if match:
        code = match.group(1)
        country = match.group(2)
        country = country.replace("INVALID: ", "")
        code_to_country[float(code)] = country
udf_code_to_country = udf(lambda code : code_to_country.get(code, None))        
valid_migration_data = immigration_data.withColumn("arrival_date", udf_date(immigration_data.arrdate)) \
                                        .withColumn("country_of_residence", udf_code_to_country(immigration_data.i94res)) \
                                        .withColumnRenamed("i94port", "iata_code") \
                                        .selectExpr("arrival_date",
                                                    "cast(i94yr as int) as arrival_year", 
                                                    "cast(i94mon as int) as arrival_month",                                                    
                                                    "iata_code",
                                                    "i94addr as first_state_visited",
                                                    "cast(biryear as int) as birth_year",
                                                    "cast(i94bir as int) as age",
                                                    "country_of_residence")
valid_migration_data.show(5)
valid_migration_data.printSchema()

+------------+------------+-------------+---------+-------------------+----------+---+--------------------+
|arrival_date|arrival_year|arrival_month|iata_code|first_state_visited|birth_year|age|country_of_residence|
+------------+------------+-------------+---------+-------------------+----------+---+--------------------+
|  2016-04-29|        2016|            4|      XXX|               null|      1979| 37|             ECUADOR|
|  2016-04-07|        2016|            4|      ATL|                 AL|      1991| 25|         SOUTH KOREA|
|  2016-04-01|        2016|            4|      WAS|                 MI|      1961| 55|             ALBANIA|
|  2016-04-01|        2016|            4|      NYC|                 MA|      1988| 28|             ALBANIA|
|  2016-04-01|        2016|            4|      NYC|                 MA|      2012|  4|             ALBANIA|
+------------+------------+-------------+---------+-------------------+----------+---+--------------------+
only showing top 5 rows

roo

### Data Model
#### Conceptual Data Model
I decided to represent this data in the schema with
* Two Dimension tables called `airports_in_usa` and `us_demogrphics`.
* Two Fact tables called `temperatures` and `us_visitors`

The `airports_in_usa` will contain the information about airports with attributes:
* iata_code: IATA code of airport
* municipality: name of jurisdiction where the airport is located

Based on the type of data to be stored in `airports_in_usa`, which is like containing labels or categorical data. It be easily considered to be a Dimension table.

The `us_demographics` will contain the snapshot of US population in various cities at 2015 with attributes:
* state: Name of state of USA
* state_code: two letter abbreviated state code for the state
* male_population: population count of males
* female_population: population count of females
* total_population: total population of the city
* foreign_born: population count of foreign born residents
* avg_household_size: average household size
* median_age: median age of people
* race: race of the people

The `us_demographics` contains population count for various cities in USA. However, the Census is published on yearly basis. As many of the Census surveys are performed on annual basis of frequency, it is clear that the data in them is going to be updated less frequently. Therefore they can be thought as Dimension table.

The `temperatures` table will contain information about averaget temperature of different cities around the world with following columns:
* city (string): name of city
* country (string): name of country
* latitude (string): latitude of the city
* longitude (string): longitude of the city
* avg_temperature (float): average temperature of the day
* avg_temperature_uncertainty (float): the 95% confidence interval around the average temperature
* date (date): date when the reading of temperature was taken
* month (integer): month of the temperature reading
* year (integer): year of the temperature reading

The data stored in `temperatures` table will look like a timeseries data with numeric values representing average temperature at cities around the world. It is possible to update this table regularly by appending rows to it when the daily average temperature readings for different cities around the world become available. Considering this, it can be considered as Fact table in this schema.  

The `us_visitors` table will contain timeseries data of visitors arriving at different airports in the USA on each day around the year. The columns in table will be:
* arrival_date: date of arrival of visitor
* arriva_month: month of arrival of visitor
* arrival_year: year of arrival of visitor
* country_of_residence: country of residence of visitor
* iata_code: IATA code of airport where the visitor arrived
* first_state_visited: two letter abbreviation of state code to be first visited by visitor
* birth_year: birth year of visitor
* age: age of visitor on arrival to USA
* visa_type: type of visa of visitor
* airport_municipality: place where airport of arrival is located

Currently it has data of 2016. It can be udpated by appending new rows to it for representing the data when the new data about visitor becomes available. This table can potentially contain millions of rows representing facts and therefore can be seen as a Fact table. Location of airport will be added in column `airport_municipality` to allow performing aggregation during analytical queries over that attribute as well as joining it with `temperatures` table while doing statistical analysis of average city temperatures with immigration patterns.

#### Map of Data Pipelines
Following are the steps to be performed in the data pipelines to create tables in the model described above.  
1. Airports data is stored in CSV file. It will be read into a Spark dataframe and performed upon necessary transformations to prepare table `airports_in_usa` and load it as Parquet file format as a Dimension table in our data model.
2. Temperatures data is stored in CSV file. It will be read into a Spark dataframe and performed upon necessary transformations to prepare table `temperatures` and load it as Parquet file format as a Fact table in our data model.
3. US Cities demographcis data from 2015 is stored in CSV file. It will be read into a Spark dataframe and performed upon necessary transformations to prepare table `us_demographics` and load it as Parquet file format as a Dimension table in our data model.
4. Immigration data is currently stored in separate files in SAS binary data file format with one file for each month of 2016. ETL process will extract data from each of these files by looping through the each month in the year, perform necessary transformations on data of that month and combine it with the data of earlier months into one table called `us_visitors` and load it as Parquet file format as a Fact table in our data model. The municipality where the airport of visitor's arrival is stored airports dataset and it will be added to this table as `airport_municipality` during the transformation stage of this table.

### Running Pipelines to Model the Data 
#### Creating the data model
Below is the code for methods that called to execute ETL steps on downloaded datasets to create final data tables as described in data model.

In [19]:
def extract_temperature_data():
    return spark.read.format("csv").option("header", True).load("../../data2/GlobalLandTemperaturesByCity.csv")

In [20]:
def transform_temperature_data(temp_data):
    def get_datetime(date_string):
        return dt.strptime(date_string, '%Y-%m-%d')
    udf_datetime = udf(lambda date_string: get_datetime(date_string), DateType())
    # Filtering out rows with null value in AverageTemperature
    # Adding date column of DateType
    clean_temp_data = temp_data.where(temp_data["AverageTemperature"].isNotNull()) \
                            .withColumn("date", udf_datetime(temp_data.dt)) \
                            .selectExpr(["City as city", 
                                         "Country as country", 
                                         "Latitude as latitude", 
                                         "Longitude as longitude", 
                                             "cast(AverageTemperature as float) as avg_temperature",
                                            "cast(AverageTemperatureUncertainty as float) as avg_temperature_uncertainty",
                                            "date",
                                            "month(date) as month",
                                            "year(date) as year"]) \
                            .where(temp_data["AverageTemperature"].isNotNull()) 
                                
    return clean_temp_data

In [21]:
def load_temperature_data(df):
    df.write.parquet(path="temperatures.parquet", mode="overwrite")

In [None]:
temp_data_raw = extract_temperature_data()
transformed_data = transform_temperature_data(temp_data_raw)
load_temperature_data(transformed_data)

In [None]:
def extract_airports_data ():
    return spark.read.format("csv").option("header", True).load("airport-codes_csv.csv")
def transform_airports_data (ap_data):
    us_ap_data = ap_data.filter(ap_data["iso_country"] == "US").filter(ap_data["iata_code"] != "null").selectExpr("iata_code", "municipality")
    return us_ap_data
def load_airports_data(us_ap_data):
    us_ap_data.write.parquet(path="airports_in_usa.parquet", mode="overwrite")

In [None]:
ap_data_raw = extract_airports_data()
us_airports_data = transform_airports_data(ap_data_raw)
load_airports_data(us_airports_data)

In [None]:
def extract_demographics_data():
    return spark.read.format("csv").option("header", True).option("delimiter", ";").load("us-cities-demographics.csv")
def transform_demographics_data(dem_data):
    valid_dem_data = dem_data.withColumnRenamed("State Code", "state_code") \
                    .withColumnRenamed("Median Age", "median_age") \
                    .withColumnRenamed("Male Population", "male_population") \
                    .withColumnRenamed("Female Population", "female_population") \
                    .withColumnRenamed("Total Population", "total_population") \
                    .withColumnRenamed("Foreign-born", "foreign_born") \
                    .withColumnRenamed("Average Household Size", "avg_household_size") \
                    .selectExpr("City as city", 
                        "State as state", 
                        "state_code",
                        "cast(male_population as bigint) as male_population", 
                        "cast(female_population as bigint) as female_population", 
                        "cast(total_population as bigint) as total_population",
                        "cast(foreign_born as bigint) as foreign_born",
                        "cast(avg_household_size as int) as avg_household_size", 
                        "cast(median_age as float) as median_age", 
                        "Race as race")
    return valid_dem_data
def load_demographics_data(dem_data):
    dem_data.write.parquet(path="us_demographics.parquet", partitionBy=["state_code"], mode="overwrite")

In [None]:
dem_data_raw = extract_demographics_data()
valid_dem_data = transform_demographics_data(dem_data_raw)
load_demographics_data(valid_dem_data)

In [47]:
def extract_immigration_data (month, year):
    data = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_{}{}_sub.sas7bdat'.format(month, year))
    return data

def transform_migration_data(immigration_data):
    def get_date(sas_numeric_date):
        date = pd.to_timedelta(sas_numeric_date, unit='D') + pd.Timestamp('1960-1-1')
        return date
    udf_date = udf(lambda x : get_date(x), DateType())
    code_to_country_string = """
     582 =  'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)'
       236 =  'AFGHANISTAN'
       101 =  'ALBANIA'
       316 =  'ALGERIA'
       102 =  'ANDORRA'
       324 =  'ANGOLA'
       529 =  'ANGUILLA'
       518 =  'ANTIGUA-BARBUDA'
       687 =  'ARGENTINA '
       151 =  'ARMENIA'
       532 =  'ARUBA'
       438 =  'AUSTRALIA'
       103 =  'AUSTRIA'
       152 =  'AZERBAIJAN'
       512 =  'BAHAMAS'
       298 =  'BAHRAIN'
       274 =  'BANGLADESH'
       513 =  'BARBADOS'
       104 =  'BELGIUM'
       581 =  'BELIZE'
       386 =  'BENIN'
       509 =  'BERMUDA'
       153 =  'BELARUS'
       242 =  'BHUTAN'
       688 =  'BOLIVIA'
       717 =  'BONAIRE, ST EUSTATIUS, SABA' 
       164 =  'BOSNIA-HERZEGOVINA'
       336 =  'BOTSWANA'
       689 =  'BRAZIL'
       525 =  'BRITISH VIRGIN ISLANDS'
       217 =  'BRUNEI'
       105 =  'BULGARIA'
       393 =  'BURKINA FASO'
       243 =  'BURMA'
       375 =  'BURUNDI'
       310 =  'CAMEROON'
       326 =  'CAPE VERDE'
       526 =  'CAYMAN ISLANDS'
       383 =  'CENTRAL AFRICAN REPUBLIC'
       384 =  'CHAD'
       690 =  'CHILE'
       245 =  'CHINA, PRC'
       721 =  'CURACAO' 
       270 =  'CHRISTMAS ISLAND'
       271 =  'COCOS ISLANDS'
       691 =  'COLOMBIA'
       317 =  'COMOROS'
       385 =  'CONGO'
       467 =  'COOK ISLANDS'
       575 =  'COSTA RICA'
       165 =  'CROATIA'
       584 =  'CUBA'
       218 =  'CYPRUS'
       140 =  'CZECH REPUBLIC'
       723 =  'FAROE ISLANDS (PART OF DENMARK)'  
       108 =  'DENMARK'
       322 =  'DJIBOUTI'
       519 =  'DOMINICA'
       585 =  'DOMINICAN REPUBLIC'
       240 =  'EAST TIMOR'
       692 =  'ECUADOR'
       368 =  'EGYPT'
       576 =  'EL SALVADOR'
       399 =  'EQUATORIAL GUINEA'
       372 =  'ERITREA'
       109 =  'ESTONIA'
       369 =  'ETHIOPIA'
       604 =  'FALKLAND ISLANDS'
       413 =  'FIJI'
       110 =  'FINLAND'
       111 =  'FRANCE'
       601 =  'FRENCH GUIANA'
       411 =  'FRENCH POLYNESIA'
       387 =  'GABON'
       338 =  'GAMBIA'
       758 =  'GAZA STRIP' 
       154 =  'GEORGIA'
       112 =  'GERMANY'
       339 =  'GHANA'
       143 =  'GIBRALTAR'
       113 =  'GREECE'
       520 =  'GRENADA'
       507 =  'GUADELOUPE'
       577 =  'GUATEMALA'
       382 =  'GUINEA'
       327 =  'GUINEA-BISSAU'
       603 =  'GUYANA'
       586 =  'HAITI'
       726 =  'HEARD AND MCDONALD IS.'
       149 =  'HOLY SEE/VATICAN'
       528 =  'HONDURAS'
       206 =  'HONG KONG'
       114 =  'HUNGARY'
       115 =  'ICELAND'
       213 =  'INDIA'
       759 =  'INDIAN OCEAN AREAS (FRENCH)' 
       729 =  'INDIAN OCEAN TERRITORY' 
       204 =  'INDONESIA'
       249 =  'IRAN'
       250 =  'IRAQ'
       116 =  'IRELAND'
       251 =  'ISRAEL'
       117 =  'ITALY'
       388 =  'IVORY COAST'
       514 =  'JAMAICA'
       209 =  'JAPAN'
       253 =  'JORDAN'
       201 =  'KAMPUCHEA'
       155 =  'KAZAKHSTAN'
       340 =  'KENYA'
       414 =  'KIRIBATI'
       732 =  'KOSOVO' 
       272 =  'KUWAIT'
       156 =  'KYRGYZSTAN'
       203 =  'LAOS'
       118 =  'LATVIA'
       255 =  'LEBANON'
       335 =  'LESOTHO'
       370 =  'LIBERIA'
       381 =  'LIBYA'
       119 =  'LIECHTENSTEIN'
       120 =  'LITHUANIA'
       121 =  'LUXEMBOURG'
       214 =  'MACAU'
       167 =  'MACEDONIA'
       320 =  'MADAGASCAR'
       345 =  'MALAWI'
       273 =  'MALAYSIA'
       220 =  'MALDIVES'
       392 =  'MALI'
       145 =  'MALTA'
       472 =  'MARSHALL ISLANDS'
       511 =  'MARTINIQUE'
       389 =  'MAURITANIA'
       342 =  'MAURITIUS'
       760 =  'MAYOTTE (AFRICA - FRENCH)' 
       473 =  'MICRONESIA, FED. STATES OF'
       157 =  'MOLDOVA'
       122 =  'MONACO'
       299 =  'MONGOLIA'
       735 =  'MONTENEGRO' 
       521 =  'MONTSERRAT'
       332 =  'MOROCCO'
       329 =  'MOZAMBIQUE'
       371 =  'NAMIBIA'
       440 =  'NAURU'
       257 =  'NEPAL'
       123 =  'NETHERLANDS'
       508 =  'NETHERLANDS ANTILLES'
       409 =  'NEW CALEDONIA'
       464 =  'NEW ZEALAND'
       579 =  'NICARAGUA'
       390 =  'NIGER'
       343 =  'NIGERIA'
       470 =  'NIUE'
       275 =  'NORTH KOREA'
       124 =  'NORWAY'
       256 =  'OMAN'
       258 =  'PAKISTAN'
       474 =  'PALAU'
       743 =  'PALESTINE' 
       504 =  'PANAMA'
       441 =  'PAPUA NEW GUINEA'
       693 =  'PARAGUAY'
       694 =  'PERU'
       260 =  'PHILIPPINES'
       416 =  'PITCAIRN ISLANDS'
       107 =  'POLAND'
       126 =  'PORTUGAL'
       297 =  'QATAR'
       748 =  'REPUBLIC OF SOUTH SUDAN'
       321 =  'REUNION'
       127 =  'ROMANIA'
       158 =  'RUSSIA'
       376 =  'RWANDA'
       128 =  'SAN MARINO'
       330 =  'SAO TOME AND PRINCIPE'
       261 =  'SAUDI ARABIA'
       391 =  'SENEGAL'
       142 =  'SERBIA AND MONTENEGRO'
       745 =  'SERBIA' 
       347 =  'SEYCHELLES'
       348 =  'SIERRA LEONE'
       207 =  'SINGAPORE'
       141 =  'SLOVAKIA'
       166 =  'SLOVENIA'
       412 =  'SOLOMON ISLANDS'
       397 =  'SOMALIA'
       373 =  'SOUTH AFRICA'
       276 =  'SOUTH KOREA'
       129 =  'SPAIN'
       244 =  'SRI LANKA'
       346 =  'ST. HELENA'
       522 =  'ST. KITTS-NEVIS'
       523 =  'ST. LUCIA'
       502 =  'ST. PIERRE AND MIQUELON'
       524 =  'ST. VINCENT-GRENADINES'
       716 =  'SAINT BARTHELEMY' 
       736 =  'SAINT MARTIN' 
       749 =  'SAINT MAARTEN' 
       350 =  'SUDAN'
       602 =  'SURINAME'
       351 =  'SWAZILAND'
       130 =  'SWEDEN'
       131 =  'SWITZERLAND'
       262 =  'SYRIA'
       268 =  'TAIWAN'
       159 =  'TAJIKISTAN'
       353 =  'TANZANIA'
       263 =  'THAILAND'
       304 =  'TOGO'
       417 =  'TONGA'
       516 =  'TRINIDAD AND TOBAGO'
       323 =  'TUNISIA'
       264 =  'TURKEY'
       161 =  'TURKMENISTAN'
       527 =  'TURKS AND CAICOS ISLANDS'
       420 =  'TUVALU'
       352 =  'UGANDA'
       162 =  'UKRAINE'
       296 =  'UNITED ARAB EMIRATES'
       135 =  'UNITED KINGDOM'
       695 =  'URUGUAY'
       163 =  'UZBEKISTAN'
       410 =  'VANUATU'
       696 =  'VENEZUELA'
       266 =  'VIETNAM'
       469 =  'WALLIS AND FUTUNA ISLANDS'
       757 =  'WEST INDIES (FRENCH)' 
       333 =  'WESTERN SAHARA'
       465 =  'WESTERN SAMOA'
       216 =  'YEMEN'
       139 =  'YUGOSLAVIA'
       301 =  'ZAIRE'
       344 =  'ZAMBIA'
       315 =  'ZIMBABWE'
       403 =  'INVALID: AMERICAN SAMOA'
       712 =  'INVALID: ANTARCTICA' 
       700 =  'INVALID: BORN ON BOARD SHIP'
       719 =  'INVALID: BOUVET ISLAND (ANTARCTICA/NORWAY TERR.)'
       574 =  'INVALID: CANADA'
       720 =  'INVALID: CANTON AND ENDERBURY ISLS' 
       106 =  'INVALID: CZECHOSLOVAKIA'
       739 =  'INVALID: DRONNING MAUD LAND (ANTARCTICA-NORWAY)' 
       394 =  'INVALID: FRENCH SOUTHERN AND ANTARCTIC'
       501 =  'INVALID: GREENLAND'
       404 =  'INVALID: GUAM'
       730 =  'INVALID: INTERNATIONAL WATERS' 
       731 =  'INVALID: JOHNSON ISLAND' 
       471 =  'INVALID: MARIANA ISLANDS, NORTHERN'
       737 =  'INVALID: MIDWAY ISLANDS' 
       753 =  'INVALID: MINOR OUTLYING ISLANDS - USA'
       740 =  'INVALID: NEUTRAL ZONE (S. ARABIA/IRAQ)' 
       710 =  'INVALID: NON-QUOTA IMMIGRANT'
       505 =  'INVALID: PUERTO RICO'
        0  =  'INVALID: STATELESS'
       705 =  'INVALID: STATELESS'
       583 =  'INVALID: UNITED STATES'
       407 =  'INVALID: UNITED STATES'
       999 =  'INVALID: UNKNOWN'
       239 =  'INVALID: UNKNOWN COUNTRY'
       134 =  'INVALID: USSR'
       506 =  'INVALID: U.S. VIRGIN ISLANDS'
       755 =  'INVALID: WAKE ISLAND'  
       311 =  'Collapsed Tanzania (should not show)'
       741 =  'Collapsed Curacao (should not show)'
        54 =  'No Country Code (54)'
       100 =  'No Country Code (100)'
       187 =  'No Country Code (187)'
       190 =  'No Country Code (190)'
       200 =  'No Country Code (200)'
       219 =  'No Country Code (219)'
       238 =  'No Country Code (238)'
       277 =  'No Country Code (277)'
       293 =  'No Country Code (293)'
       300 =  'No Country Code (300)'
       319 =  'No Country Code (319)'
       365 =  'No Country Code (365)'
       395 =  'No Country Code (395)'
       400 =  'No Country Code (400)'
       485 =  'No Country Code (485)'
       503 =  'No Country Code (503)'
       589 =  'No Country Code (589)'
       592 =  'No Country Code (592)'
       791 =  'No Country Code (791)'
       849 =  'No Country Code (849)'
       914 =  'No Country Code (914)'
       944 =  'No Country Code (944)'
       996 =  'No Country Code (996)'
     """
    code_to_country = {}
    for line in code_to_country_string.split("\n"):
        line = line.strip()
        match = re.search(r"([\d]+)[=\s]+'([\w.\(\)\s:]+)'", line)    
        if match:
            code = match.group(1)
            country = match.group(2)
            country = country.replace("INVALID: ", "")
            code_to_country[float(code)] = country
    udf_code_to_country = udf(lambda code : code_to_country.get(code, None))    
    
    valid_immigration_data = immigration_data.withColumn("arrival_date", udf_date(immigration_data.arrdate)) \
                                            .withColumn("country_of_residence", udf_code_to_country(immigration_data.i94res)) \
                                            .withColumnRenamed("i94port", "iata_code") \
                                            .selectExpr("cast(i94yr as int) as arrival_year", 
                                                        "cast(i94mon as int) as arrival_month",
                                                       "arrival_date",
                                                       "iata_code",
                                                       "i94addr as first_state_visited",
                                                       "cast(biryear as int) as birth_year",
                                                        "cast(i94bir as int) as age",
                                                       "country_of_residence",
                                                       "visatype as visa_type") 
    
    return valid_immigration_data


In [72]:
immigration_table = None
for month in ["jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"]:
    for year in [16]:
        immigration_staging = extract_immigration_data(month, year)
        immigration_valid = transform_migration_data(immigration_staging)
        if immigration_table:
            immigration_table = immigration_table.union(immigration_valid)
        else:
            immigration_table = immigration_valid
            
immigration_table.createOrReplaceTempView("valid_immigration")
ap_data = extract_airports_data()
ap_data.createOrReplaceTempView("airports")
enriched_immigration_table = spark.sql("""
    SELECT valid_immigration.*, airports.municipality as airport_municipality
    FROM valid_immigration
    JOIN airports ON (airports.iata_code = valid_immigration.iata_code)        
 """)

In [73]:
enriched_immigration_table.show(5)

In [73]:
enriched_immigration_table.write.parquet(path="us_visitors.parquet", mode="overwrite")

+------------+-------------+------------+---------+-------------------+----------+---+--------------------+---------+--------------------+
|arrival_year|arrival_month|arrival_date|iata_code|first_state_visited|birth_year|age|country_of_residence|visa_type|airport_municipality|
+------------+-------------+------------+---------+-------------------+----------+---+--------------------+---------+--------------------+
|        2016|            1|  2016-01-12|      BOS|                 MA|      1996| 20|             ALBANIA|       F1|              Boston|
|        2016|            1|  2016-01-12|      BOS|                 MA|      1996| 20|             ALBANIA|       F1|              Boston|
|        2016|            1|  2016-01-16|      BOS|                 CT|      1999| 17|             ALBANIA|       B2|              Boston|
|        2016|            1|  2016-01-16|      BOS|                 CT|      1971| 45|             ALBANIA|       B2|              Boston|
|        2016|            1

#### Data Quality Checks
Two types of quality checks are performed for each of the tables created at the end of the ETL process.
1. Does the table have desired schema? 
2. Does the table have non-zero number of rows?

In [49]:
temperatures_table = spark.read.parquet("temperatures.parquet")

In [50]:
temperatures_table.count() > 0

True

In [51]:
temperatures_table.printSchema()

root
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- avg_temperature: float (nullable = true)
 |-- avg_temperature_uncertainty: float (nullable = true)
 |-- date: date (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [52]:
temperatures_schema_test = [("city", StringType()),
                           ("country", StringType()),
                           ("latitude", StringType()),
                           ("longitude", StringType()),
                           ("avg_temperature", FloatType()),
                           ("avg_temperature_uncertainty", FloatType()),
                           ("date", DateType()),
                           ("month", IntegerType()),
                           ("year", IntegerType())]
for test in temperatures_schema_test:
    column = test[0]
    expected = test[1]
    if temperatures_table.schema[column].dataType != expected:
        raise ValueError("temperatures.{} must be of type {}".format(column, expected))


In [53]:
airports_table = spark.read.parquet("airports_in_usa.parquet")

In [54]:
airports_table.printSchema()

root
 |-- iata_code: string (nullable = true)
 |-- municipality: string (nullable = true)



In [55]:
airports_table.show(5)

+---------+-------------+
|iata_code| municipality|
+---------+-------------+
|      OCA|    Key Largo|
|      PQS|Pilot Station|
|      CSE|Crested Butte|
|      JCY| Johnson City|
|      PMX|       Palmer|
+---------+-------------+
only showing top 5 rows



In [56]:
if airports_table.count() == 0:
    raise ValueError("airports_in_usa is empty")

In [57]:
airports_schema_test = [("municipality", StringType()),
                        ("iata_code", StringType())]

In [58]:
for test in airports_schema_test:
    column = test[0]
    expected = test[1]
    if airports_table.schema[column].dataType != expected:
        raise ValueError("airports_in_usa.{} must be of type {}".format(column, expected))

In [59]:
us_visitors_table = spark.read.parquet("us_visitors.parquet")

In [60]:
if us_visitors_table.count() == 0:
    raise ValueError("Migrations table is empty")

In [61]:
us_visitors_table.printSchema()

root
 |-- arrival_date: date (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- first_state_visited: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- country_of_residence: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- airport_municipality: string (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)



In [62]:
us_visitors_schema_test = [("arrival_date", DateType()),
                            ("iata_code", StringType()),
                            ("first_state_visited", StringType()),
                            ("birth_year", IntegerType()),
                            ("age", IntegerType()),
                            ("country_of_residence", StringType()),
                            ("arrival_year", IntegerType()),
                            ("arrival_month", IntegerType()),
                            ("visa_type", StringType()),
                            ("airport_municipality", StringType())]

for test in us_visitors_schema_test:
    column = test[0]
    expected = test[1]
    if us_visitors_table.schema[column].dataType != expected:
        raise ValueError("us_visitors.{} must be of type {}".format(column, expected))

In [63]:
demographics_table = spark.read.parquet("us_demographics.parquet")

In [65]:
if demographics_table.count() == 0:
    raise ValueError("us_demographics table is empty")

In [66]:
demographics_table.printSchema()
us_demographics_schema_test = [("city", StringType()),
                               ("state", StringType()),
                               ("state_code", StringType()),
                               ("male_population", LongType()),
                               ("female_population", LongType()),
                               ("total_population", LongType()),
                               ("foreign_born", LongType()),
                               ("avg_household_size", IntegerType()),
                               ("median_age", FloatType()),
                               ("race", StringType())]

for test in us_demographics_schema_test:
    column = test[0]
    expected = test[1]
    if demographics_table.schema[column].dataType != expected:
        raise ValueError("demographics.{} must be of type {}".format(column, expected))

root
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- male_population: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- total_population: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- avg_household_size: integer (nullable = true)
 |-- median_age: float (nullable = true)
 |-- race: string (nullable = true)
 |-- state_code: string (nullable = true)



#### Data dictionary

The data is organized in four tables.
Dimension tables: airports_in_usa, us_demographics
Fact tables: temperatures, migrations

##### airports_in_usa
This table contains airports with IATA code and municipality name where it is located.
More information about the original dataset of airport codes is available here https://datahub.io/core/airport-codes#readme

Columns:
* iata_code (string): IATA Code of the airport
* municipality (string): name of jurisdiction where the airport is located

##### us_demographics
This table contains information about the population demographics of cities in the USA based on Census survey data of 2015.
More information about the original source of dataset can be found here: https://public.opendatasoft.com/explore/dataset/us-cities-demographics/information/

Columns:
* state (string): Name of state of USA
* state_code (string): two letter abbreviated state code for the state
* male_population (long): population count of males
* female_population (long): population count of females
* total_population (long): total population of the city
* foreign_born (long): population count of foreign born residents
* avg_household_size (integer): average household size
* median_age (float): median age of people
* race (string): race of the people

##### temperatures
This table contains information on average temperature of cities around the world on each day measured from years 1750 to 2013.
More information about the original data source is here: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data
("city", StringType()),
                           ("country", StringType()),
                           ("latitude", StringType()),
                           ("longitude", StringType()),
                           ("avg_temperature", FloatType()),
                           ("avg_temperature_uncertainty", FloatType()),
                           ("date", DateType()),
                           ("month", IntegerType()),
                           ("year", IntegerType())]
Columns:
* city (string): name of city
* country (string): name of country
* latitude (string): latitude of the city
* longitude (string): longitude of the city
* avg_temperature (float): average temperature of the day
* avg_temperature_uncertainty (float):  the 95% confidence interval around the average temperature
* date (date): date when the reading of temperature was taken
* month (integer): month of the temperature reading
* year (integer): year of the temperature reading

##### us_visitors
This table contains information about the details of immigrants to USA at various ports during 2016.
More details about the original data soruce: https://travel.trade.gov/research/reports/i94/historical/2016.html

Columns:
* arrival_date (date): date of arrival of visitor
* arrival_year (integer): year of arrival of visitor
* arrival_month (integer): month of arrival of visitor
* country_of_residence (string): country of residence of visitor
* iata_code (string): IATA code of airport where the visitor arrived
* first_state_visited (string): two letter abbreviation of state code to be first visited by visitor
* birth_year (integer): birth year of visitor
* age (integer): age of visitor on arrival to USA
* visa_type (string): type of visa of visitor
* airport_municipality (string): jurisdiction where airport is located


In this project, I chose to work with Apache Spark for performing the ETL tasks. Spark's dataframe API provides very convenient way to wrangle data using powerful SQL like operations. Once I extracted data from the downloaded datasets using Spark and performed necessary transformations, I saved the final tables of data model in Parquet file format. This allows the tables to be retrieved in future while retaining the datatype and schema assigned to the columns of the tables during ETL process.

To continue gaining insights on regular basis from this dataset, it is important to keep the data stored in the tables updated. The frequency of updating this table is dependent on the frequency at which the underlying source datasets are updated.

**temperatures table**
The data is loaded from original dataset containing average temperature of the cities around the year. Considering the frequency of the data entries in this dataset being on day level, this dataset should be ideally updated on daily basis if the the required data is available from organization like NASA.

**us_visitors table:**
This table is loaded from the the dataset about the visitors to various airports in USA around the year. As the records in this original source dataset have granuarity of day level, this table should be ideally updated on daily basis if the updates to original dataset is available on daily basis.

**airports_in_usa table:**
It depends on the dataset about airport codes and locations of airports around the world. This data is supposed to be changing less frequently. Updating it once per month should be sufficient.

**us_demographics table:**
Data about the population of different genders and races across different cities around USA. This data is derived from Census survey data that is carried out once a year on average. Therefore, this dataset should be updated once per year.

If the dataset underlying this project would have increased by 100x, it would mean that the RAM installed on one commodity hardware won't be sufficient to process it desirable timeframe. In such scenario, it is preferable to run the pipeline on Apache Spark cluster running over 10s of compute nodes to parallelize the task across the RAM of multiple machines.

If the dataset is utilized for populating a dashboard and is supposed to be updated on daily basis by 7am every day, then I would set up the data pipeline using Apache Airflow. There I would create an DAG (Directed Acyclic Graph) that performs inter-dependent tasks like downloading fresh datasets from web, extracting data from those downloaded files into Apache Spark cluster, perform data wrangling on the dataframes in Spark to do necessary transformations on data, combine the datasets and load the designated tables in the schema. I would set the schedule interval to be `daily` and start time to be around 5am for the DAG in Airflow so that the data is updated by 7am after necessary retries in case of failures.

If the database was need to be accessed by let's say 100+ people in a company's environment, I would host the data tables into a data warehouse solution like Amazon Redshift cluster. It's columnar based storage system would allow faster processing of aggregations over different attribute of the Fact tables simultaneously by large number of users. I may also allocate more nodes and instance size to the cluster to serve the required performance to answer the analytical queries raised by people in reasonable response time.