# US Immigration & Demographics Analysis
### Data Engineering Capstone Project

#### Project Summary
This project is a part of Udacity's Nanodegree Data Engineering program in which the participating students have to build an ETL process. The purpose of the data engineering capstone project is to give a chance to combine what is learned throughout the program and implement. For this project I have chosen to complete the capstone project provided by Udacity itself.

This project intends to analyze the immigration and demographics data of US. Using the datasets, we can answer queries like:

*  What mode of transport is highly used for immigration
*  Demographics dataset should be directly proportional to immigration dataset
*  What role does global temperature play in immigration
*  What is Immigration per month or year
*  What is the Immigration as per country of origin
*  legal/illegal immigration

The project follows the following steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, regexp_replace, col, substring
from select import select
import configparser
import os

### Step 1: Scope the Project and Gather Data

#### Scope 
Using the dataset of this project, we can analyze the immigration and demographics of US per state and year and how are they affected by different parameters in the dataset.

#### The dataset used:
1. I94 Immigration Data: This data comes from the US National Tourism and Trade Office, [here](https://travel.trade.gov/research/reports/i94/historical/2016.html)
2. World Temperature Data: This dataset came from Kaggle, [here](https://www.kaggle.com/datasets/berkeleyearth/climate-change-earth-surface-temperature-data)
3. U.S. City Demographic Data: This data comes from OpenSoft and contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000, [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
4. Airport Code Table: This is a simple table of airport codes and corresponding cities. The airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code, [here](https://datahub.io/core/airport-codes#data)

I have completed this project using Apache Spark, PostgreSQL and Apache Airflow.

### Step 2: The following steps represent data gathering from different sources, their assessment and cleanup

In [2]:
# Creating spark session and including postgresql jar
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config("spark.jars", "postgresql-42.5.0.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.5.0.jar") \
    .getOrCreate()

22/09/23 12:48:23 WARN Utils: Your hostname, Samars-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.29.187 instead (on interface en0)
22/09/23 12:48:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/09/23 12:48:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Reading database config values from dl.cfg
config = configparser.ConfigParser()
config.read("/Users/samar/airflow/dl.cfg")
host=config['POSTGRES']['HOST']
port=config['POSTGRES']['PORT']
dbname=config['POSTGRES']['DBNAME']
user=config['POSTGRES']['USER']
password=config['POSTGRES']['PASSWORD']

##### Immigration Data

In [4]:
# reading US Immigration data
df = spark.read.parquet("data/sas_data/")
df.show(10, False)


                                                                                

22/09/23 12:48:30 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|cicid    |i94yr |i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto |gender|insnum|airline|admnum        |fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|4.0   |245.0 |438.0 |LOS    |20574.0|1.0    |CA     |20582.0|40.0  |1.0    |1.0  |20160430|SYD     |null |G      |O      |nu

In [5]:
# write data from spark dataframe to postgresql
df.write.mode("overwrite").jdbc(f"jdbc:postgresql://{host}:{port}/{dbname}", "staging_immigration",
          properties={"user": f"{user}", "password": f"{user}", "driver": 'org.postgresql.Driver'})

                                                                                

#### US Demographics data

In [6]:
# Reading US Demographics data
df = spark.read.csv('data/us-cities-demographics.csv', header=True, sep=';')
df.show(5, False)

+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+-------------------------+-----+
|City            |State        |Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|Race                     |Count|
+----------------+-------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+-------------------------+-----+
|Silver Spring   |Maryland     |33.8      |40601          |41862            |82463           |1562              |30908       |2.6                   |MD        |Hispanic or Latino       |25924|
|Quincy          |Massachusetts|41.0      |44129          |49500            |93629           |4147              |32935       |2.39                  |MA        |White                    |58723|
|Hoover          |Alabama      |38.

In [7]:
# Renaming column names

demographics = df.withColumnRenamed('City','city')\
                .withColumnRenamed('State','state')\
                .withColumnRenamed('Median Age','median_age')\
                .withColumnRenamed('Male Population','male_population')\
                .withColumnRenamed('Female Population','female_population')\
                .withColumnRenamed('Total Population','total_population')\
                .withColumnRenamed('Number of Veterans','no_of_veterans')\
                .withColumnRenamed('Foreign-born','foreign_born')\
                .withColumnRenamed('Average Household Size','avg_household_size')\
                .withColumnRenamed('Size','size')\
                .withColumnRenamed('State Code','state_code')\
                .withColumnRenamed('Race','race')\
                .withColumnRenamed('Count','count')
demographics.show(5, False)
demographics.count()


+----------------+-------------+----------+---------------+-----------------+----------------+--------------+------------+------------------+----------+-------------------------+-----+
|city            |state        |median_age|male_population|female_population|total_population|no_of_veterans|foreign_born|avg_household_size|state_code|race                     |count|
+----------------+-------------+----------+---------------+-----------------+----------------+--------------+------------+------------------+----------+-------------------------+-----+
|Silver Spring   |Maryland     |33.8      |40601          |41862            |82463           |1562          |30908       |2.6               |MD        |Hispanic or Latino       |25924|
|Quincy          |Massachusetts|41.0      |44129          |49500            |93629           |4147          |32935       |2.39              |MA        |White                    |58723|
|Hoover          |Alabama      |38.5      |38040          |46799           

2891

In [8]:
# write data from spark dataframe to postgres
demographics.write.mode("overwrite").jdbc(f"jdbc:postgresql://{host}:{port}/{dbname}", "staging_demographics",
          properties={"user": f"{user}", "password": f"{password}", "driver": 'org.postgresql.Driver'})

####  Airport codes data

In [9]:
# Reading and column modification in Airport code data
df = spark.read.csv('data/airport-codes_csv.csv', header=True, inferSchema=True)
airport_codes=df.withColumn('latitude', split('coordinates', ', ').getItem(0))\
    .withColumn('longitude', split('coordinates', ', ').getItem(1))

airport_codes.show(5, False)

+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+-------------------------------------+------------------+-----------------+
|ident|type         |name                              |elevation_ft|continent|iso_country|iso_region|municipality|gps_code|iata_code|local_code|coordinates                          |latitude          |longitude        |
+-----+-------------+----------------------------------+------------+---------+-----------+----------+------------+--------+---------+----------+-------------------------------------+------------------+-----------------+
|00A  |heliport     |Total Rf Heliport                 |11          |NA       |US         |US-PA     |Bensalem    |00A     |null     |00A       |-74.93360137939453, 40.07080078125   |-74.93360137939453|40.07080078125   |
|00AA |small_airport|Aero B Ranch Airport              |3435        |NA       |US         |US-KS     |Leoti       |0

In [10]:
# write data from spark dataframe to postgres
airport_codes.write.mode("overwrite").jdbc(f"jdbc:postgresql://{host}:{port}/{dbname}", "staging_airport_codes",
          properties={"user": f"{user}", "password": f"{password}", "driver": 'org.postgresql.Driver'})

                                                                                

####  Global temperature data

In [11]:
# reading Global temperature data
file_name = 'data/GlobalLandTemperaturesByCity.csv'
df = spark.read.csv(file_name, header=True, inferSchema=True)
df.show(10, False)

[Stage 15:====>                                                   (1 + 11) / 12]

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|dt                 |AverageTemperature|AverageTemperatureUncertainty|City |Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|6.068             |1.7369999999999999           |Århus|Denmark|57.05N  |10.33E   |
|1743-12-01 00:00:00|null              |null                         |Århus|Denmark|57.05N  |10.33E   |
|1744-01-01 00:00:00|null              |null                         |Århus|Denmark|57.05N  |10.33E   |
|1744-02-01 00:00:00|null              |null                         |Århus|Denmark|57.05N  |10.33E   |
|1744-03-01 00:00:00|null              |null                         |Århus|Denmark|57.05N  |10.33E   |
|1744-04-01 00:00:00|5.7879999999999985|3.6239999999999997           |Århus|Denmark|57.05N  |10.33E   |
|1744-05-01 00:00:00|10.644            |1.2830000000000001      

                                                                                

In [12]:
df = df.withColumnRenamed('dt','date')\
                .withColumnRenamed('AverageTemperature','avg_temperature')\
                .withColumnRenamed('AverageTemperatureUncertainty','avg_temperature_uncertainty')\
                .withColumnRenamed('City','city')\
                .withColumnRenamed('Country','country')\
                .withColumnRenamed('Latitude','latitude')\
                .withColumnRenamed('Longitude','longitude')
df.show(10, False)

+-------------------+------------------+---------------------------+-----+-------+--------+---------+
|date               |avg_temperature   |avg_temperature_uncertainty|city |country|latitude|longitude|
+-------------------+------------------+---------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|6.068             |1.7369999999999999         |Århus|Denmark|57.05N  |10.33E   |
|1743-12-01 00:00:00|null              |null                       |Århus|Denmark|57.05N  |10.33E   |
|1744-01-01 00:00:00|null              |null                       |Århus|Denmark|57.05N  |10.33E   |
|1744-02-01 00:00:00|null              |null                       |Århus|Denmark|57.05N  |10.33E   |
|1744-03-01 00:00:00|null              |null                       |Århus|Denmark|57.05N  |10.33E   |
|1744-04-01 00:00:00|5.7879999999999985|3.6239999999999997         |Århus|Denmark|57.05N  |10.33E   |
|1744-05-01 00:00:00|10.644            |1.2830000000000001         |Århus|Denmark|

In [13]:
# write data from spark dataframe to postgres
df.write.mode("overwrite").jdbc(f"jdbc:postgresql://{host}:{port}/{dbname}", "staging_global_temperature",
          properties={"user": f"{user}", "password": f"{password}", "driver": 'org.postgresql.Driver'})

                                                                                

#### Cleaning Steps for I94_SAS_Labels_Descriptions

This data is cleaned up using regex and have been broken down into 5 different tables:
1. staging_i94country
2. staging_i94port
3. staging_i94address
4. staging_i94mode
5. staging_i95visa

These tables are further used with immigration table

In [14]:
# Read I94_SAS_Labels_Descriptions for further processing
df = spark.read.text('data/I94_SAS_Labels_Descriptions.SAS')
df.show(10, False)

+----------------------------------------------------------------------------------------+
|value                                                                                   |
+----------------------------------------------------------------------------------------+
|libname library 'Your file location' ;                                                  |
|proc format library=library ;                                                           |
|                                                                                        |
|/* I94YR - 4 digit year */                                                              |
|                                                                                        |
|/* I94MON - Numeric month */                                                            |
|                                                                                        |
|/* I94CIT & I94RES - This format shows all the valid and invalid codes for processing */|

In [15]:
# Extracting values for staging_i94country table
i94country = df.filter("value like '% =  %'").withColumn('value', regexp_replace(col("value"), " =  ", "="))\
        .withColumn('value', regexp_replace(col("value"), "'", ""))\
        .withColumn('value', regexp_replace(col("value"), "   ", ""))\
        .withColumn('value', regexp_replace(col("value"), " ;", ""))\
        .selectExpr('substring(value, 0, 3) as code', 'substring(value, 5, length(value)) as country')
i94country.show(10, False)
i94country.count()

+----+---------------------------------------------------------+
|code|country                                                  |
+----+---------------------------------------------------------+
|582 |MEXICO Air Sea, and Not Reported (I-94, no land arrivals)|
|236 |AFGHANISTAN                                              |
|101 |ALBANIA                                                  |
|316 |ALGERIA                                                  |
|102 |ANDORRA                                                  |
|324 |ANGOLA                                                   |
|529 |ANGUILLA                                                 |
|518 |ANTIGUA-BARBUDA                                          |
|687 |ARGENTINA                                                |
|151 |ARMENIA                                                  |
+----+---------------------------------------------------------+
only showing top 10 rows



289

In [16]:
# write data from spark dataframe to postgres
i94country.write.mode("overwrite").jdbc(f"jdbc:postgresql://{host}:{port}/{dbname}", "staging_i94country",
          properties={"user": f"{user}", "password": f"{password}", "driver": 'org.postgresql.Driver'})

In [17]:
# Extracting values for staging_i94port table
i94port = df.filter("value like '%\t=\t%'").withColumn('value', regexp_replace(col("value"), "\t=\t", "="))\
        .withColumn('value', regexp_replace(col("value"), "'", ""))\
        .withColumn('value', regexp_replace(col("value"), "   ", ""))\
        .withColumn('value', regexp_replace(col("value"), " =", "="))\
        .selectExpr('substring(value, 0, 3) as code', 'substring(value, 5, length(value)) as port')
i94port.show(10, False)
i94port.count()

+----+----------------------------+
|code|port                        |
+----+----------------------------+
|ALC |ALCAN, AK                   |
|ANC |ANCHORAGE, AK               |
|BAR |BAKER AAF - BAKER ISLAND, AK|
|DAC |DALTONS CACHE, AK           |
|PIZ |DEW STATION PT LAY DEW, AK  |
|DTH |DUTCH HARBOR, AK            |
|EGL |EAGLE, AK                   |
|FRB |FAIRBANKS, AK               |
|HOM |HOMER, AK                   |
|HYD |HYDER, AK                   |
+----+----------------------------+
only showing top 10 rows



660

In [18]:
# write data from spark dataframe to postgres
i94port.write.mode("overwrite").jdbc(f"jdbc:postgresql://{host}:{port}/{dbname}", "staging_i94port",
          properties={"user": f"{user}", "password": f"{password}", "driver": 'org.postgresql.Driver'})

In [19]:
# Extracting values for staging_i94mode table
i94mode = df.filter("value like '\t% = %'").withColumn('value', regexp_replace(col("value"), " = ", "="))\
        .withColumn('value', regexp_replace(col("value"), "\t", ""))\
        .withColumn('value', regexp_replace(col("value"), "'", ""))\
        .withColumn('value', regexp_replace(col("value"), " ;", ""))\
        .selectExpr('substring(value, 0, 1) as code', 'substring(value, 3, length(value)) as mode')
i94mode.show(i94mode.count(), False)
i94mode.count()

+----+------------+
|code|mode        |
+----+------------+
|1   |Air         |
|2   |Sea         |
|3   |Land        |
|9   |Not reported|
+----+------------+



4

In [20]:
# write data from spark dataframe to postgres
i94mode.write.mode("overwrite").jdbc(f"jdbc:postgresql://{host}:{port}/{dbname}", "staging_i94mode",
          properties={"user": f"{user}", "password": f"{password}", "driver": 'org.postgresql.Driver'})

In [21]:
# Extracting values for staging_i94address table
i94address = df.filter("value like '\t%\\'=\\'%'").withColumn('value', regexp_replace(col("value"), " = ", "="))\
        .withColumn('value', regexp_replace(col("value"), "\t", ""))\
        .withColumn('value', regexp_replace(col("value"), "'", ""))\
        .withColumn('value', regexp_replace(col("value"), " ;", ""))\
        .selectExpr('substring(value, 0, 2) as code', 'substring(value, 4, length(value)) as state')
i94address.show(10, False)
i94address.count()

+----+-----------------+
|code|state            |
+----+-----------------+
|AL  |ALABAMA          |
|AK  |ALASKA           |
|AZ  |ARIZONA          |
|AR  |ARKANSAS         |
|CA  |CALIFORNIA       |
|CO  |COLORADO         |
|CT  |CONNECTICUT      |
|DE  |DELAWARE         |
|DC  |DIST. OF COLUMBIA|
|FL  |FLORIDA          |
+----+-----------------+
only showing top 10 rows



55

In [22]:
# write data from spark dataframe to postgres
i94address.write.mode("overwrite").jdbc(f"jdbc:postgresql://{host}:{port}/{dbname}", "staging_i94address",
          properties={"user": f"{user}", "password": f"{password}", "driver": 'org.postgresql.Driver'})

In [23]:
# Extracting values for staging_i94visa table
i94visa = df.filter("value like '   _ =%'").withColumn('value', regexp_replace(col("value"), " = ", "="))\
        .withColumn('value', regexp_replace(col("value"), "   ", ""))\
        .selectExpr('substring(value, 0, 1) as code', 'substring(value, 3, length(value)) as type')
i94visa.show(i94visa.count(), False)
i94visa.count()


+----+--------+
|code|type    |
+----+--------+
|1   |Business|
|2   |Pleasure|
|3   |Student |
+----+--------+



3

In [24]:
# write data from spark dataframe to postgres
i94visa.write.mode("overwrite").jdbc(f"jdbc:postgresql://{host}:{port}/{dbname}", "staging_i94visa",
          properties={"user": f"{user}", "password": f"{password}", "driver": 'org.postgresql.Driver'})

In [25]:
spark.stop()

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
I have used PostgreSQL locally for data modeling. The staging tables and ERD are as below:



#### 3.2 Mapping Out Data Pipelines
The data pipeline is scheduled to run once every month to analyze the change in the dataset. The scheduling is done via Apache Airflow.
The steps are as follows:
* Begin Execution
* Run task - load_staging_tables, this will execute the us_immigration_and_demographics_staging.py script inorder to create and load the staging tables in the postgresql database
* Run task - create_final_tables, this will create the final tables as per the ERD, described below
* Begin loading the final tables as per ERD with Start_loading dummy operator
* The first loading task is load_global_temperature_table as it contains a huge number of data and sometimes fails due to exitcode=<Negsignal.SIGKILL: -9>, hence to ensure that ths task is not failed by Airflow due to SIGTERM, the execution timeout for postgres has been increased to 3600 seconds. Also, other tables will start loading once this table is loaded successfully so that if it fails the entire dag run fails instantly.
* The rest of the tables are then loaded - load_airport_codes_table, load_demographics_table, load_i94address_table, load_i94country_table, load_i94mode_table, load_i94port_table, load_i94visa_table are run simultaneously
* Finally, the immigration table is loaded with load_immigration_table task
* Once all the tables are correctly loaded, Run_data_quality_checks is executed to confirm data quality
* Stop execution


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
##### Staging Tables:
![Staging Tables](images/staging_diagram.png)

##### ERD:
![ERD](images/ERD.png)

##### Airflow graph view:
![DAG_GRAPH](images/DAG.png)

##### Run the data pipelines to create the data model
* Configure Postgres connection in Airflow UI, as:
    Admin >> Connections >> **+** Add a new record >> Do as below:

![Postgres conn in Airflow](images/Airflow_Postgres_Conn.png)

* In order to run the data pipeline using Airflow, make sure all the directory and folder structure are in the right order and accessible by Airflow by setting the PYTHONPATH correctly or importing via sys and sys.path.insert()
* Add postgresql-42.5.0.jar in config for Spark to use it to setup postgresql connection using jdbc
* Launch Airflow using the command - airflow scheduler and airflow webserver
* Access Airflow UI at localhost port-8080
* Activate and trigger us_immigration_and_demographics_dag to execute it


* The code for data pipelines can be viewed in their respective python script files. Please refer that.

#### 4.2 Data Quality Checks
The data quality checks are performed to ensure the pipeline ran as expected. These include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks, refer the logs below:

##### Logs:
![logs](images/logs_Run_data_quality_checks.png)

#### 4.3 Data dictionary 

#### The Data Dictionary is as follows:


1. The immigration dataset is a dynamic dataset that is bound to change with each run

**immigration:-**
* cicid: int8, CICID is a unique number for the immigrants. (No null values found)\n
* i94yr: int4, 4 digit year
* i94mon: int4, Numeric month
* i94cit: int4, country of citizenship
* i94res: int4, country of residence - from where one has travelled. (No null values found)
* i94port: varchar(5), Shows all the valid and invalid codes for Ports
* arrdate: int8, is the Arrival Date in the USA. It is a SAS date numeric field that a permament format has not been applied. (Convert it to timestamp format)
* i94mode: int4, Mode of arrival. There are missing values as well as not reported
* i94addr: varchar(5), Destination address - is where the immigrants resides in USA. Invalid codes are marked as 'other'
* depdate: int8, is the Departure Date from the USA. It is a SAS date numeric field that a permament format has not been applied. (Convert it to timestamp format)
* i94bir: int4, Age of Respondent in Years 
* i94visa: int4, Visa codes 
* count: int4, Used for summary statistics 
* dtadfile: int8, Character Date Field - Date added to I-94 Files - CIC does not use 
* visapost: varchar(5), Department of State where Visa was issued 
* occup: varchar(5), Occupation that will be performed in U.S. 
* entdepa: char(1), Arrival Flag - admitted or paroled into the U.S.
* entdepd: char(1), Departure Flag - Departed, lost I-94 or is deceased
* entdepu: char(1), Update Flag - Either apprehended, overstayed, adjusted to perm residence
* matflag: char(1), Match flag - Match of arrival and departure records
* biryear: int4, 4 digit year of birth 
* dtaddto: int8, Character Date Field - Date to which admitted to U.S. (allowed to stay until) 
* gender: char(1), Non-immigrant sex 
* insnum: varchar, INS number 
* airline: varchar(5), Airline used to arrive in U.S. 
* admnum: int8, Admission Number 
* fltno: varchar(50), Flight number of Airline used to arrive in U.S. 
* visatype: varchar(5), Class of admission legally admitting the non-immigrant to temporarily stay in U.S.
---

2. As immigration dataset will change, it will also affect the change in demographics dataset

**demographics:-**
* City: varchar(50)                   
* State: varchar(50)                   
* median_age: int4       
* male_population: int8       
* female_population: int8
* total_population: int8 
* no_of_veterans: int8  
* foreign_born: int8            
* avg_household_size: float  
* state_code: char(2)              
* race: varchar(100)                  
* count: int8
---

3. This dataset is not as dynamic and is not assumed to be changing as frequently

**airport_codes:-**
* ident: varchar(10)       
* type: varchar(50)         
* name: varchar(50)        
* elevation_ft: float 
* continent: varchar(50)    
* iso_country: varchar(50)     
* iso_region: varchar(50)      
* municipality: varchar(50)   
* gps_code: varchar(50)        
* iata_code: varchar(50)       
* local_code: varchar(50)
* coordinates: varchar(100) - Here, the coordinates have been further split into latitude and longitude 
* latitude: int8
* longitude: int8  
---

4. This is a dynamic dataset expected to change frequently as per date

**global_temperature:-**
* date: timestamp
* avg_temperature: float
* avg_temperature_uncertainty: float
* city: varchar(100)
* country: varchar(100)
* latitude: varchar(100)
* longitude: varchar(100)
---

5. This is a constant dataset. Expected change is rare.

**i94visa:** 
* code: int4, numeric code of visa type
* type: varchar(50), type of visa
---

6. This is a constant dataset. Expected change is rare

**i94address_state:**
* code: varchar(5), contains the two-letter state code along with numeric invalid state-code identified
* state: varchar(100), US-state
---

7. This is a constant dataset. Expected change is rare

**i94mode:**
* code: int4, numeric code for mode of transport
* mode: varchar(100), mode of transport
---

8. This is a constant dataset. Expected change is rare

**i94port:**
* code: varchar(5)
* port: varchar(100)
---

9.This is a constant dataset. Expected change is rare

**i94country:**
* code: int4
* country: varchar(100)



#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

1. Rationale for the choice of tools & technologies for the project -
I have completed the project using Apache spark, Apache Airflow and PostgreSQL.
* Spark is capable of handling a huge amount of data at a time, distributed across a cluster of thousands of cooperating physical or virtual servers. It has an extensive set of developer libraries and APIs. Supports languages such as Java, Python, R, and Scala.
* Airflow - Makes data pipelines as DAGs manageable. Task management, scheduling as code, and visualization of data pipelines' dependencies, progress, logs, code, trigger tasks, and success status can be easily handled.
* PostgreSQL - It is a powerful, open source object-relational database system. Used it for storing project data and its analysis

Please Note that, I could have also implemented the project using AWS redshift and S3 instead. However, chose to use Postgresql locally as I didn't have enough Udacity's AWS credits left. Nonetheless I got a good grasp of it in the DWH, Spark and Airflow projects of the Nanodegree course.

2. How often the data should be updated and why - The data should be updated as per business requirement. However, Once every month should also be good to detect any kind of change in data

3. Describe your approach to the following scenarios:
* The data was increased by 100x - Increase the cluster size if using AWS (EMR, Redshift). Increase timeout period of DAG-RUN in Apache Airflow, so that it doesnt throw out-of-memory error. One can also use indexing in data storage, make use of parquet formats and partitioning of data
* The data populates a dashboard that must be updated on a daily basis by 7am every day - Make changes in Airflow schedule as per requirement
* The database needed to be accessed by 100+ people - users with specific roles and access crredentials can be created. For eg. give read-only access, etc.

## Example Analytical SQL Queries for the dataset:

1. **Top 5 highest number of migrants from a particular country**

>> select count(cicid) as num, i94cit , c.country from immigration i join i94country c
ON (i.i94cit = c.code)
group by i94cit , c.country
order by num desc
limit 5

![top_5_highest_number_of_immigrants](images/top_5_country.png)

2. **Total immigrants per mode**

>> select count(i.cicid), i.i94mode, m.mode from immigration i join i94mode m
on( i.i94mode = m.code)
group by i.i94mode, m.mode
order by count(cicid) desc

![immigrants_per_mode](images/immigrants_per_mode.png)




