**FAQs** <br>
- How can I read multiple .sas7bdat files into the  data frame? <br>
https://knowledge.udacity.com/questions/280093
- Feedback on sample data architecture and database schema <br>
https://knowledge.udacity.com/questions/131241
- How do I troubleshoot "Error: Vpc associated with db subnet group does not exist" <br>
https://knowledge.udacity.com/questions/65605

# United States Immigration  
### Data Engineering Capstone Project

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

<a id='intro'></a>
## Table of Contents
<ul>
<li><a href="#intro"><b>Project Summary</b></a></li>
<li><a href="#step1">Step 1: Scope the Project and Gather Data</a></li>
<li><a href="#step2">Step 2: Explore and Assess the Data</a></li>
<li><a href="#step3">Step 3: Define the Data Model</a></li>
<li><a href="#step4">Step 4: Run ETL to Model the Data</a></li>
<li><a href="#step5">Step 5: Complete Project Write Up</a></li>
</ul>

#### Project Summary
Preparing the data for "Immigration" and "U.S. State Demographics" Analytics

<br>

In [1]:
# Do all imports and installs here
import pandas as pd
from time import time
import pickle
from zipfile import ZipFile 
from pprint import pprint
import re

import pyspark.sql.functions as F
import pyspark.sql.types as T

from datetime import datetime, timedelta

<br>

<a id='step1'></a>
### Step 1: Scope the Project and Gather Data
<ul>
 <li><a href="#intro">Project Summary</a></li>
 <li><a href="#step1"><b>Step 1: Scope the Project and Gather Data</b></a></li>
 <li><a href="#step2">Step 2: Explore and Assess the Data</a></li>
 <li><a href="#step3">Step 3: Define the Data Model</a></li>
 <li><a href="#step4">Step 4: Run ETL to Model the Data</a></li>
 <li><a href="#step5">Step 5: Complete Project Write Up</a></li>
 </ul>

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

# Datasets

<a href='https://travel.trade.gov/research/reports/i94/historical/2016.html'><b>I94 Immigration Data:</b></a>
- This data comes from the US National Tourism and Trade Office. 
- A data dictionary is included in the workspace. 
- There's a sample file so you can take a look at the data in csv format before reading it all in. 
- You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.
- https://i94.cbp.dhs.gov/I94/#/home
- The immigration data and the global temperate data is in an attached disk.
- You can access the immigration data in a folder with the following path: ../../data/18-83510-I94-Data-**2016**/. 
- There's a file for each month of the year. An example file name is i94_apr16_sub.sas7bdat.

**What is a Form I-94?**
- Form I-94 is the DHS Arrival/Departure Record issued to aliens who are admitted to the U.S.,
- who are adjusting status while in the U.S. or extending their stay, among other things. 
- A CBP officer generally attaches the I-94 to the non-immigrant visitor's passport upon U.S. entry.
<br><br>

<a href='https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data'><b>World Temperature Data:</b></a>
- This dataset came from Kaggle. 
<br><br>

<a href='https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/'><b>U.S. City Demographic Data:</b></a>
- This data comes from OpenSoft. 
- This dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. 
- This data comes from the US Census Bureau's **2015** American Community Survey.
<br><br>

<a href='https://datahub.io/core/airport-codes#data'><b>Airport Code Table:</b></a>
- This is a simple table of airport codes and corresponding cities.

**To read from local machine**, <br>
We need to download the file,'i94_apr16_sub.sas7bdat' <br>
- <font color='red'>It is very memory consuming to do the project in local machine.</font><br>
- It is better to use udacity workspace.

>#### 1- Immigration Data Sample

In [2]:
# Read in the data here
t0 = time()
df = pd.read_csv('input_data/immigration_data_sample.csv')
df.drop(columns=['Unnamed: 0'], inplace=True)
print(df.shape)
print(f"Duration: {time()-t0} Seconds")

with pd.option_context('display.max_columns', None):
    display(df.head(2))

(1000, 28)
Duration: 0.05954098701477051 Seconds


Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2


<br>

>#### 2- Temperature Data

In [3]:
with ZipFile("input_data/data2.zip") as zipf:
    with zipf.open("data2/GlobalLandTemperaturesByCity.csv", "r") as f:
        df_temp = pd.read_csv(f)
        
print(df_temp.shape)
df_temp.head()

(8599212, 7)


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


<br>

>#### 3- Airport Codes

In [4]:
df_port = pd.read_csv('input_data/airport-codes_csv.csv')
print(df_port.shape)
df_port.head(1)

(55075, 12)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"


<br>

>#### 4- US Cities Demographics

In [5]:
df_demo = pd.read_csv('input_data/us-cities-demographics.csv', delimiter=';')
print(df_demo.shape)
df_demo.head(2)

(2891, 12)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


<br>

### <a href='https://github.com/saurfang/spark-sas7bdat'>Spark SAS Data Source (sas7bdat --> SAS V7 and Beyond File Extension for a data set)<a>
- A library for reading SAS data (.sas7bdat) with Spark. 
- This packages **allow reading SAS binary file (.sas7bdat) in parallel** as data frame in Spark SQL. 
- It provides utility to export it as CSV (using spark-csv) or parquet file.
- **Note:** this library seems to not be maintained at all, consider using another one if that's for production code.
<br><br>
https://stackoverflow.com/questions/73890143/get-py4j-protocol-py4jjavaerror-java-lang-noclassdeffounderror-scala-productcm <br>
https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/hostwin/n0sk6o15955yoen19n9ghdziqw1u.htm <br>
https://fileinfo.com/extension/sas7bdat

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

22/11/22 03:29:31 WARN Utils: Your hostname, Mahmouds-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.3 instead (on interface en0)
22/11/22 03:29:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
https://repos.spark-packages.org/ added as a remote repository with the name: repo-1


:: loading settings :: url = jar:file:/Users/mnagy99/opt/anaconda3/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/mnagy99/.ivy2/cache
The jars for the packages stored in: /Users/mnagy99/.ivy2/jars
saurfang#spark-sas7bdat added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-65daf489-c02e-4330-988b-a8361cfd0aa0;1.0
	confs: [default]
	found saurfang#spark-sas7bdat;2.0.0-s_2.11 in spark-packages
	found com.epam#parso;2.0.8 in central
	found org.slf4j#slf4j-api;1.7.5 in central
	found org.apache.logging.log4j#log4j-api-scala_2.11;2.7 in central
	found org.scala-lang#scala-reflect;2.11.8 in central
:: resolution report :: resolve 398ms :: artifacts dl 12ms
	:: modules in use:
	com.epam#parso;2.0.8 from central in [default]
	org.apache.logging.log4j#log4j-api-scala_2.11;2.7 from central in [default]
	org.scala-lang#scala-reflect;2.11.8 from central in [default]
	org.slf4j#slf4j-api;1.7.5 from central in [default]
	saurfang#spark-sas7bdat;2.0.0-s_2.11 from spark-packages in [default]
	------------------------------------------------------

In [7]:
spark

<br>

#### Immigration Data
- There's a file for each month of the year.
- **Note:** these files are large, so you'll have to think about how to process and aggregate them efficiently.
<br><br>
- The most important decision for modeling with this data is thinking about the **level of aggregation**. 
- Do you want to aggregate by airport by month? Or **by city by year?** 
- This level of aggregation will influence how you join the data with other datasets. 
- There isn't a right answer, it all depends on what you want your final dataset to look like.

In [8]:
# t0 = time()
# fname = "../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat"
# df_spark = spark.read.format('com.github.saurfang.sas.spark').load(fname)
# print(f"Count: {df_spark.count()} Records")
# print(f"Duration: {time()-t0} Seconds")

# # df_spark.printSchema()
# df_spark.limit(2).toPandas()

#### or

In [9]:
# # Using Pandas.read_sas --> Slowest Read
# t0 = time()
# fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
# df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")
# print(df.shape)
# print(f"Duration: {time()-t0} Seconds")

# df.head(2)

#### or

In [10]:
#write to parquet --> Fastest Read
# Apache Parquet:
# is an open source, column-oriented data file format 
# designed for efficient data storage and retrieval.

t0 = time()
# df_spark.write.parquet("sas_data")
df_spark=spark.read.parquet("input_data/sas_data")
print(f"Count: {df_spark.count()} Records")
print(f"Duration: {time()-t0} Seconds")

df_spark.printSchema()
df_spark.limit(2).toPandas()

                                                                                

Count: 3096313 Records
Duration: 6.9363932609558105 Seconds
root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airlin

22/11/22 03:29:44 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,...,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1
1,5748518.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,NV,20591.0,...,,M,1984.0,10292016,F,,VA,94955620000.0,7,B1


<br>

<a id='step2'></a>
### Step 2: Explore and Assess the Data
<ul>
 <li><a href="#intro">Project Summary</a></li>
 <li><a href="#step1">Step 1: Scope the Project and Gather Data</a></li>
 <li><a href="#step2"><b>Step 2: Explore and Assess the Data</b></a></li>
 <li><a href="#step3">Step 3: Define the Data Model</a></li>
 <li><a href="#step4">Step 4: Run ETL to Model the Data</a></li>
 <li><a href="#step5">Step 5: Complete Project Write Up</a></li>
 </ul>
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

**Data Transformation and Loading:**

**fact_immigration:**
- Convert Dates (sas / string) to DateTime
- Add Visa Categories (1=Business - 2=Pleasure - 3=Student)
- Add travel modes (1=Air - 2=Sea - 3=Land - 9=Not Reported)
- Write to parquet

**dim_time:**
- Get all the arrival dates from the immigration data_set;
- extract year, month, day, week from the date and insert all the values in the dim_time table;
- Write to parquet

**dim_city_demographics:**
- Rename Columns
- Write to parquet

|Character|Description|
|-|:-|
| \ |Signals a special sequence (can also be used to escape special characters)|
| ( ) |Capture and group|
| . | Any character (except newline character)|
| * | Zero or more occurrences |

\'(.*)\' --> Any group of characters between single quotes

In [207]:
# Create list of valid ports
i94_sas_label_descriptions_fname = "I94_SAS_Labels_Descriptions.SAS"
with open(i94_sas_label_descriptions_fname) as f:
    lines = f.readlines()

re_obj = re.compile(r"\'(.*)\'.*\'(.*)\'")
valid_ports = {}
for line in lines[302:961]:
    results = re_obj.search(line)
#     valid_ports[results[1]]=[results[2]]
    valid_ports[results.group(1)] = results.group(2)
print(len(valid_ports))
# pprint(valid_ports)

659


{<br>
'ABE': 'ABERDEEN, WA          ', <br>
'ABG': 'ALBURG, VT            ', <br>
}

### Check invalid ports in our data

In [215]:
# print(df_spark.filter(~df_spark.i94port.isin(list(valid_ports.keys()))).count())
# df_spark.filter(~df_spark.i94port.isin(list(valid_ports.keys()))).select('i94port').show()

df_spark.filter(~df_spark.i94port.isin(list(valid_ports.keys()))).groupBy('i94port').count().show()

+-------+-----+
|i94port|count|
+-------+-----+
|    INT|   32|
+-------+-----+



In [246]:
port_df = pd.DataFrame(valid_ports.items(), columns=['port', 'location'])
port_df.location = port_df.location.str.strip().str.split(',')
print(port_df.shape)
port_df.head(2)

(659, 3)


Unnamed: 0,port,location,city
0,ALC,"[ALCAN, AK]",ALCAN
1,ANC,"[ANCHORAGE, AK]",ANCHORAGE


In [250]:
port_df[port_df.location.apply(lambda x: len(x)) != 2]

Unnamed: 0,port,location,city,state
28,MAP,[MARIPOSA AZ],MARIPOSA AZ,
49,BLT,"[PACIFIC, HWY. STATION, CA]",PACIFIC,
76,WAS,[WASHINGTON DC],WASHINGTON DC,
217,WSB,"[WARROAD INTL, SPB, MN]",WARROAD INTL,
385,SPF,"[BLACK HILLS, SPEARFISH, SD]",BLACK HILLS,
...,...,...,...,...
654,NK,[No PORT Code (NK)],,
655,ADU,[No PORT Code (ADU)],,
656,AKT,[No PORT Code (AKT)],,
657,LIT,[No PORT Code (LIT)],,


In [249]:
port_df['city'] = port_df.location.apply(lambda x: x[0] if not x[0].startswith('No PORT') else 'N/A')
port_df['state'] = port_df.location.apply(lambda x: x[1] if len(x)==2 else 'N/A')

<br>

In [196]:
# Create list of valid states
valid_states = df_demo["State Code"].unique()
print(len(valid_states))
print(valid_states)

49
['MD' 'MA' 'AL' 'CA' 'NJ' 'IL' 'AZ' 'MO' 'NC' 'PA' 'KS' 'FL' 'TX' 'VA'
 'NV' 'CO' 'MI' 'CT' 'MN' 'UT' 'AR' 'TN' 'OK' 'WA' 'NY' 'GA' 'NE' 'KY'
 'SC' 'LA' 'NM' 'IA' 'RI' 'PR' 'DC' 'WI' 'OR' 'NH' 'ND' 'DE' 'OH' 'ID'
 'IN' 'AK' 'MS' 'HI' 'SD' 'ME' 'MT']


22/11/22 09:01:29 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 709264 ms exceeds timeout 120000 ms
22/11/22 09:01:30 WARN SparkContext: Killing executors is not supported by current scheduler.


<a id='imm'></a>
>#### 1- Immigration Data Sample

<ul>
 <li><a href="#imm"><b>1- Immigration Data Sample</b></a></li>
 <li><a href="#temp">2- Temperature Data</a></li>
 <li><a href="#air">3- Airport Codes</a></li>
 <li><a href="#demo">4- US Cities Demographics</a></li>
 </ul>

#### Immigration Data
- There's a file for each month of the year.
- **Note:** these files are large, so you'll have to think about how to process and aggregate them efficiently.
<br><br>
- The most important decision for modeling with this data is thinking about the **level of aggregation**. 
- Do you want to aggregate by airport by month? Or **by city by year?** 
- This level of aggregation will influence how you join the data with other datasets. 
- There isn't a right answer, it all depends on what you want your final dataset to look like.

In [11]:
# for col in df.columns:
#     print(col)

In [12]:
# # Read in the data here
# t0 = time()
# df = pd.read_csv('input_data/immigration_data_sample.csv')
# df.drop(columns=['Unnamed: 0'], inplace=True)
# print(df.shape)
# print(f"Duration: {time()-t0} Seconds")

with pd.option_context('display.max_columns', None):
    display(df.head(2))

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2


In [13]:
df.insnum.value_counts().head()

3668.0    3
3943.0    3
3576.0    2
3517.0    2
3882.0    1
Name: insnum, dtype: int64

In [14]:
df.occup.value_counts().head()

STU    2
PHA    1
OTH    1
Name: occup, dtype: int64

In [15]:
df.entdepu.value_counts().head()

Series([], Name: entdepu, dtype: int64)

In [16]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 28 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   cicid     1000 non-null   float64
 1   i94yr     1000 non-null   float64
 2   i94mon    1000 non-null   float64
 3   i94cit    1000 non-null   float64
 4   i94res    1000 non-null   float64
 5   i94port   1000 non-null   object 
 6   arrdate   1000 non-null   float64
 7   i94mode   1000 non-null   float64
 8   i94addr   941 non-null    object 
 9   depdate   951 non-null    float64
 10  i94bir    1000 non-null   float64
 11  i94visa   1000 non-null   float64
 12  count     1000 non-null   float64
 13  dtadfile  1000 non-null   int64  
 14  visapost  382 non-null    object 
 15  occup     4 non-null      object 
 16  entdepa   1000 non-null   object 
 17  entdepd   954 non-null    object 
 18  entdepu   0 non-null      float64
 19  matflag   954 non-null    object 
 20  biryear   1000 non-null   float

In [17]:
print(f"{df.i94addr.nunique()} States")

51 States


In [18]:
# df['visapost'].value_counts().head()

In [19]:
df['count'].value_counts()

1.0    1000
Name: count, dtype: int64

In [20]:
# df.i94addr.value_counts()

In [21]:
# df.i94port.value_counts()

<br>
https://dhhr.wv.gov/bcf/Services/familyassistance/PolicyManual/Documents/Chapter%2018/ch18_1.pdf

- I94CIT & I94RES - This format shows all the valid and invalid codes for processing value i94cntyl 
- cntryl --> Country of Living (I think!)

| - | Col | Description |
|-|-:|:-|
|1|cicid|Application number / Citizenship and Immigration C...|
|**2**|**i94yr**|**4 digit year (Application issue or arrival year, I think)**|
|**3**|**i94mon**|**Numeric month (Application issue or arrival month, I think)**|
|4|i94cit|Country Immigrant is Originally From (country of citizernship, I think!)|
|5|i94res|Country of Immigrant Residence (coutnry of residence, I think!)|
|6|i94port|AIR / SEAPORT of entry into the US (I think!)<br> ('XXX': 'NOT REPORTED/UNKNOWN' - '888': 'UNIDENTIFED AIR / SEAPORT' -'UNK': 'UNKNOWN POE')|
|**7**|**arrdate**|**Arrival Date in the USA (SAS date)**|
|8|i94mode| (1: 'Air' - 2: 'Sea' - 3: 'Land' -  9: 'Not reported') |
|9|i94addr|U.S. State / Address of Immigrant Inside USA (I think!) <br> ('99'='All Other Codes') <br> actually representing the final address of the migrants, that is where they currently live in the US.|
|**10**|**depdate**|**Departure Date from the USA (SAS date)**|
|**11**|**i94bir**|**Age of Respondent in Years**|
|12|i94visa|Visa codes collapsed into three categories <br> (Business - Pleasure - Student)|
|13|count|Used for summary statistics|
|**14**|**dtadfile**|**Character Date Field - Date added to I-94 Files - CIC does not use**|
|15|visapost|Department of State where where Visa was issued - CIC does not use <br> This is where your visa was issued. It will be a U.S. embassy or U.S. consulate.|
|16|occup|Occupation that will be performed in U.S. - CIC does not use|
|17|entdepa|Arrival Flag - admitted or paroled into the U.S. - CIC does not use|
|18|entdepd|Departure Flag - Departed, lost I-94 or is deceased - CIC does not use|
|19|entdepu|Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use|
|20|matflag|Match flag - Match of arrival and departure records|
|**21**|**biryear**|**4 digit year of birth**|
|**22**|**dtaddto**|**Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use <br>  visa expiration date  <br>**|
|23|gender|Non-immigrant sex|
|24|insnum|INS number (Immigration and Naturalization Service)|
|25|airline|Airline used to arrive in U.S.|
|26|admnum|Admission Number - An 11-digit number assigned to an alien when he enters the Unites States.|
|27|fltno|Flight number of Airline used to arrive in U.S.|
|28|visatype|VISATYPE - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|

<br>

In [22]:
df.visapost.value_counts().head()

# This is where your visa was issued. It will be a U.S. embassy or U.S. consulate.

MEX    28
BNS    21
BGT    14
SPL    14
GUZ    13
Name: visapost, dtype: int64

In [23]:
df.visatype.value_counts()

WT     443
B2     356
WB      91
B1      61
GMT     27
F1      10
CP       5
E2       3
F2       3
M1       1
Name: visatype, dtype: int64

<br>

https://visaservices.duke.edu/categories/b2-wt
- **B-1** visitor for business or medical treatment (valid for up to a year)
    - Usually 6 months. In addition, you can apply for extension of stay for another 6 months. Reason for extension must be consistent with the terms and conditions of your original status.
    - **WB Visa Waiver for Business:**
        - travel to the United States for business for stays of 90 days or less without obtaining a visa.
- **B-2**  visitor for pleasure, tourism, visiting friends or relatives (valid for up to a year)
    - Such type of visa is normally issued up from a period of 1 month to 10 years. The visitor will be allowed to stay up to 6 months each entry, but can apply to extend the stay in the U.S. even longer.
    - **WT Visa Waiver for Tourism:** 
        - travel to the United States for tourism for stays of 90 days or less without obtaining a visa. 
- **F-1** 
    - available only to full-time, enrolled students working toward a degree, certificate or specified course of study at a United States institution of higher education.
- **F-2** 
    - F-2 visas allow spouses and minor children of F-1 students to enter the U.S. to live with the F-1 student for the duration of the educational program. However, this does not allow you to work in the U.S. or study full-time.
- **GMT** Global Marine Travel
- **CP** Continued Presence
- **E2** 
    - The E-2 Investor Visa allows an individual to enter and work in the United States based on an investment in a U.S. business. The E2 visa is valid for three months to five years and can be extended indefinitely. 
    - The investment must be "substantial", although there is no legally defined minimum.
- **M-1**
    - The M-1 visa is a type of student visa in the U.S. reserved for international students attending vocational schools and technical schools. 
    - non-academic or “vocational study” --> Mechanical, language, cooking classes, etc...

<br>

https://www.siam-legal.com/us-visa/how-long-will-your-u-s-visa-allow-you-to-stay-in-the-u-s/ <br>
The visa expiration date is shown on the visa along with the visa issuance date. The time between visa issuance and expiration date is called your visa validity.

**How long does a visa last?**
Usually 6 months. In addition, you can apply for extension of stay for another 6 months. Reason for extension must be consistent with the terms and conditions of your original status. Such type of visa is normally issued up from a period of 1 month to 10 years.

**B1 Visa** – visitor for business or medical treatment

**B2 Visa** – visitor for pleasure, tourism, visiting friends or relatives

**D Visa** – crew members
- If the travel purpose is crew service, the length of stay allowed in the U.S. is maximum of 29 days. No extensions of stay or changes to another status.

**F Visa** – academic students
- Holders of this visa may remain in the U.S. as long as he/she remains enrolled, in a full-time study, or in an educational program at an approved school. Conditions include making normal progress towards completing the course of study and in compliance with all the terms of the visa category. There is a 60-day grace period to prepare to leave the United States. No extension application needed.

<br>

# Immigration DATA Wrangling with Pandas

In [24]:
# Note: This is just a Sample
df.cicid.isnull().sum()

0

In [25]:
# Note: This is just a Sample
df.cicid.duplicated().sum()

0

https://knowledge.udacity.com/questions/552714 <br>
**There are two factors affecting ADNUM.**
- ADNUM (Admission Number) is given to the individual every time he enters the country.
- An individual cannot enter the same country and have the same admission number two different times. You may want to check for duplicates.

In [26]:
# Note: This is just a Sample
df.admnum.duplicated().sum()

0

<br>

In [27]:
(df['i94cit'] == df['i94res']).sum()

826

In [28]:
df[df['i94cit'] != df['i94res']].head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,,M,1940.0,7052016,M,,LH,55780470000.0,464,WT
7,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,...,,M,1983.0,6302016,F,,BA,55474490000.0,117,WT
12,2711583.0,2016.0,4.0,148.0,112.0,FTL,20559.0,2.0,,20565.0,...,,M,1962.0,7132016,F,,VES,56175860000.0,93724,WT
14,1387607.0,2016.0,4.0,148.0,112.0,BOS,20552.0,1.0,MA,20560.0,...,,M,1982.0,7062016,F,,AF,55833390000.0,338,WT
18,4668286.0,2016.0,4.0,746.0,158.0,SEA,20568.0,1.0,NV,20571.0,...,,M,1970.0,10232016,M,,DL,94435600000.0,143,B2


<br>

#### <font color='red'>Convert Dates (sas / string) to DateTime</font>

In [29]:
# Convert Dates from SAS to DateTime
df['arrdate'] = pd.to_datetime(df['arrdate'], unit='D', origin='1960-01-01')
df['depdate'] = pd.to_datetime(df['depdate'], unit='D', origin='1960-01-01')

with pd.option_context('display.max_columns', None):
    display(df.head())

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,2016-04-29,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,2016-04-23,1.0,TX,2016-04-24,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,2016-04-07,1.0,FL,2016-04-27,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,2016-04-28,1.0,CA,2016-05-07,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,985523.0,2016.0,4.0,111.0,111.0,CHM,2016-04-06,3.0,NY,2016-04-09,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [30]:
df['dtaddto'].isnull().sum()

0

In [31]:
# Corrupted Date Fields
mask = df['dtaddto'].apply(lambda x: len(str(x))) < 8
df['dtaddto'][mask]

70     D/S
238    D/S
274    D/S
337    D/S
415    D/S
538    D/S
591    D/S
615    D/S
621    D/S
684    D/S
791    D/S
934    D/S
964    D/S
Name: dtaddto, dtype: object

In [32]:
# df[mask]
df[df['dtaddto'] == 'D/S'].head(2)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
70,3599863.0,2016.0,4.0,582.0,582.0,HOU,2016-04-19,1.0,TX,2016-07-04,...,,M,1974.0,D/S,F,,WN,93991160000.0,2831,F1
238,513953.0,2016.0,4.0,268.0,268.0,LOS,2016-04-03,1.0,CA,2016-06-22,...,,M,1997.0,D/S,M,,BR,92685610000.0,12,F1


In [33]:
# df = df[~(df['dtaddto'] == 'D/S')]
# df['dtaddto'][mask]

In [34]:
# https://stackoverflow.com/questions/27506367/python-pandas-integer-yyyymmdd-to-datetime

df['dtadfile'] = df['dtadfile'].apply(lambda x: pd.to_datetime(str(x), format='%Y%m%d')).dt.date

# df['dtaddto'] = df['dtaddto'].apply(lambda x: pd.to_datetime(str(x), format='%m%d%Y') if x!='D/S' else x).dt.date
df['dtaddto'] = df['dtadfile'].apply(lambda x: pd.to_datetime(str(x), infer_datetime_format=True) if x!='D/S' else x).dt.date

with pd.option_context('display.max_columns', None):
    display(df.head(2))

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,2016-04-22,1.0,HI,2016-04-29,61.0,2.0,1.0,2016-04-22,,,G,O,,M,1955.0,2016-04-22,F,,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,2016-04-23,1.0,TX,2016-04-24,26.0,2.0,1.0,2016-04-23,MTR,,G,R,,M,1990.0,2016-04-23,M,,*GA,94362000000.0,XBLNG,B2


<br>

In [35]:
# df.isnull().sum()

<br>

# Immigration DATA Wrangling with SPARK

In [36]:
df_spark.createOrReplaceTempView('immigration')

### <font color='red'>Check for NaNs</font>

https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe

- "isnan()" is a function of the pysparq.sql.function package, so you have to set which column you want to use as an argument of the function. 
- "isNull()" belongs to pyspark.sql.Column package, so what you have to do is "yourColumn.isNull()" 

https://medium.com/@allenhuang1996/whats-the-difference-between-null-and-nan-in-python-a1af20d523ce <br>
- NaN: Not a Number
- None: A Python Object (empty object).
- Pandas automatically converts the None to a NaN value.

- None means Nothing, Nothing is a concept that describes the absence of anything at all. 
- Nothing is sometimes confused with Null, but they are very different concepts 
- because Nothing means absence of anything, while Null means unknown (you do not know if there is a thing or not)
- null is often defined to be 0 in those languages, but null in Python is different. Python uses the keyword None to define null objects and variables.

In [37]:
from pyspark.sql.functions import col, isnan, when, count, desc

print(f"Count: {df_spark.count()} Records")

with pd.option_context('display.max_columns', None):
    display(df_spark.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_spark.columns]).toPandas())

Count: 3096313 Records


                                                                                

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,0,0,0,0,0,0,0,239,152592,142457,802,0,0,1,1881250,3088187,238,138429,3095921,138429,802,477,414269,2982605,83627,0,19549,0


In [38]:
with pd.option_context('display.max_columns', None):
    display(df_spark.select([count(when(isnan(c), c)).alias(c) for c in df_spark.columns]).toPandas())

                                                                                

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


<br>

In [39]:
with pd.option_context('display.max_columns', None):
    display(spark.sql("""
                SELECT *
                FROM immigration
                LIMIT 1
    """).toPandas())

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,5748517.0,2016.0,4.0,245.0,438.0,LOS,20574.0,1.0,CA,20582.0,40.0,1.0,1.0,20160430,SYD,,G,O,,M,1976.0,10292016,F,,QF,94953870000.0,11,B1


In [40]:
spark.sql("""
    SELECT COUNT (DISTINCT cicid)
    FROM immigration
""").show()

[Stage 17:>                                                         (0 + 4) / 4]

+---------------------+
|count(DISTINCT cicid)|
+---------------------+
|              3096313|
+---------------------+





In [41]:
spark.sql("""
    SELECT DISTINCT(count)
    FROM immigration
    LIMIT 10
""").show()

+-----+
|count|
+-----+
|  1.0|
+-----+





> the column "count" contain only one value "1.0"

<br>

### <font color='red'>Convert Dates (sas / string) to DateTime</font>
https://sparkbyexamples.com/pyspark/pyspark-to_date-convert-string-to-date-format/ <br>
https://knowledge.udacity.com/questions/757001 <br>

https://knowledge.udacity.com/questions/381099 <br>
Yes, that is correct. A udf is not needed in this case since the function is quite simple. Check out this to_date documentationto learn more about it: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.to_date <br>  
**Avoid udf whenever possible, since by using udfs you may end up losing all the optimization Spark does on the Dataframe/Dataset.**

In [42]:
# Since SaS date is all about the number of days after "1960, 1, 1",
# All we need to do is to add these days "timedelta(days=int(x))" to "datetime(1960, 1, 1)"
# https://libguides.library.kent.edu/SAS/DatesTime
    
def convert_dt(x):
    try:
        start = datetime(1960, 1, 1)
        return start + timedelta(days=int(x))
    except:
        return None
    
# get_date = F.udf(lambda x: convert_dt(x), T.DateType())
get_date = F.udf(convert_dt, T.DateType()) # no need for lambda

df_spark.select('arrdate', get_date('arrdate').alias("date")).show(3)
## Arrival date conversion

[Stage 27:>                                                         (0 + 1) / 1]

+-------+----------+
|arrdate|      date|
+-------+----------+
|20574.0|2016-04-30|
|20574.0|2016-04-30|
|20574.0|2016-04-30|
+-------+----------+
only showing top 3 rows



                                                                                

In [43]:
# Alternatively, we can use pandas

def sas_to_datetime(date):
    if date is not None:
        return pd.to_timedelta(date, unit='D') + pd.Timestamp('1960-1-1')
    
# sas_to_datetime_udf = F.udf(lambda x: sas_to_datetime(x), T.DateType())
sas_to_datetime_udf = F.udf(sas_to_datetime, T.DateType()) # no need for lambda

df_spark.select('arrdate', sas_to_datetime_udf('arrdate').alias("date")).show(3)
## Arrival date conversion

[Stage 28:>                                                         (0 + 1) / 1]

+-------+----------+
|arrdate|      date|
+-------+----------+
|20574.0|2016-04-30|
|20574.0|2016-04-30|
|20574.0|2016-04-30|
+-------+----------+
only showing top 3 rows



                                                                                

In [44]:
df_spark.select('depdate', get_date('depdate').alias("departure_date")).show(3)
## Arrival date conversion

+-------+--------------+
|depdate|departure_date|
+-------+--------------+
|20582.0|    2016-05-08|
|20591.0|    2016-05-17|
|20582.0|    2016-05-08|
+-------+--------------+
only showing top 3 rows



<br>

https://spark.apache.org/docs/2.3.0/api/sql/index.html#date_add <br>
All dates in SAS correspond to the number of days since 1960-01-01. Therfore, we compute the arrival dates by adding arrdate to 1960-01-01

In [45]:
# df_spark = spark.sql("SELECT *, date_add(to_date('1960-01-01'), cast(arrdate as int)) AS arrival_date FROM immigration")
# df_spark.createOrReplaceTempView("immigration2")

# https://stackoverflow.com/questions/71108605/recursive-view-error-while-using-spark-3-2-0-version
# It works with 3.0.1 but looks like the latest version has a bug

In [46]:
# USING SPARK SQL
# ----------------
# # to_date('1960-01-01') ==> is not needed
spark.sql("""
    SELECT *, date_add('1960-01-01', cast(arrdate as int)) AS arrival_date 
    FROM immigration
""").createOrReplaceTempView("immigration2")


# OR USING SPARK DATAFRAME
# -------------------------
# https://sparkbyexamples.com/pyspark/pyspark-typeerror-column-is-not-iterable/
# TypeError: Column is not iterable

# to modify depdate in place
df_spark = df_spark.withColumn('arrdate', F.expr("date_add('1960-01-01', cast(arrdate as int))")) 
df_spark.select(col("arrdate").alias("arrival_date") ).show(3)

+------------+
|arrival_date|
+------------+
|  2016-04-30|
|  2016-04-30|
|  2016-04-30|
+------------+
only showing top 3 rows



In [47]:
spark.sql("""
    SELECT count(depdate) 
    FROM immigration
    WHERE depdate IS NULL
""").show(3)

+--------------+
|count(depdate)|
+--------------+
|             0|
+--------------+



In [48]:
# Since depdate contains null values

# USING SPARK SQL
# ----------------
sql_expr = """
CASE WHEN depdate >= 1.0 THEN date_add('1960-01-01', cast(depdate as int))
     WHEN depdate IS NULL THEN NULL
     ELSE 'N/A' 
END           
"""

spark.sql("SELECT *," + sql_expr + "AS departure_date FROM immigration2").createOrReplaceTempView("immigration3")


# OR USING SPARK DATAFRAME
# -------------------------
# https://sparkbyexamples.com/pyspark/pyspark-typeerror-column-is-not-iterable/
# TypeError: Column is not iterable

# # to modify depdate in place
df_spark = df_spark.withColumn('depdate', F.expr(sql_expr)) 
df_spark.select(col("depdate").alias("departure_date") ).show(3)

+--------------+
|departure_date|
+--------------+
|    2016-05-08|
|    2016-05-17|
|    2016-05-08|
+--------------+
only showing top 3 rows



In [49]:
#  df_spark.select("depdate", 
# when(df_spark.depdate >= 1, F.expr("date_add('1960-01-01', cast(depdate as int)) AS departure_date"))\
# .otherwise('N/A')).show(3)

#  df_spark.selectExpr("depdate", sql_expr).show(3)

#  df_spark.select("depdate", F.expr(sql_expr)).show(3)

In [50]:
# check N/A values in our previous query 
spark.sql("SELECT count(*) FROM immigration3 WHERE immigration3.depdate = 'N/A'").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



<br>

In [51]:
spark.sql("""
    SELECT count(dtaddto) 
    FROM immigration
    WHERE dtaddto IS NULL
""").show()

+--------------+
|count(dtaddto)|
+--------------+
|             0|
+--------------+



In [52]:
# Since dtaddto contains null values

# USING SPARK SQL
# ---------------
spark.sql("""
SELECT *, 
       CASE WHEN dtaddto >= 1 THEN to_date(dtaddto, "MMddyyyy")
            WHEN dtaddto IS NULL THEN NULL
            ELSE 'N/A' END AS visa_end_date 
FROM immigration3
""").createOrReplaceTempView("immigration4")


# OR USING SPARK DATAFRAME
# -------------------------
df_spark = df_spark.withColumn('dtaddto', F.to_date("dtaddto" ,"MMddyyyy")) # to modify dtaddto in place
df_spark.select(col("dtaddto").alias("visa_end_date") ).show(3)

+-------------+
|visa_end_date|
+-------------+
|   2016-10-29|
|   2016-10-29|
|   2016-10-29|
+-------------+
only showing top 3 rows



In [53]:
# df_spark.select( col("dtadfile"), F.to_date(col("dtadfile") ,"yyyyMMdd").alias("date") ).show()

# We will not use date added to i94 file in the analysis

<br>

### <font color='red'>Add Visa Categories (Business - Pleasure - Student)</font>

In [54]:
# spark.sql("""
#     SELECT *, CASE 
#         WHEN i94visa = 1.0 THEN 'Business' 
#         WHEN i94visa = 2.0 THEN 'Pleasure'
#         WHEN i94visa = 3.0 THEN 'Student'
#         ELSE 'N/A' END AS visa_category 
                        
#     FROM immigration4
# """).createOrReplaceTempView("immigration5")

In [55]:
# USING SPARK SQL
# ----------------
sql_expr = """
CASE WHEN i94visa = 1.0 THEN 'Business' 
     WHEN i94visa = 2.0 THEN 'Pleasure'
     WHEN i94visa = 3.0 THEN 'Student'
     ELSE 'N/A' 
END              
"""

spark.sql("SELECT *," + sql_expr + "AS visa_category FROM immigration4").createOrReplaceTempView("immigration5")


# OR USING SPARK DATAFRAME
# -------------------------
# https://sparkbyexamples.com/pyspark/pyspark-typeerror-column-is-not-iterable/
# TypeError: Column is not iterable

# # to modify i94visa in place
df_spark = df_spark.withColumn('i94visa', F.expr(sql_expr)) 
df_spark.select(col("i94visa").alias("visa_category") ).show(3)

+-------------+
|visa_category|
+-------------+
|     Business|
|     Business|
|     Business|
+-------------+
only showing top 3 rows



In [56]:
spark.sql("""
SELECT visa_category, visatype, count(*)
FROM immigration5
GROUP BY visa_category, visatype
ORDER BY visa_category, visatype
""").show()

+-------------+--------+--------+
|visa_category|visatype|count(1)|
+-------------+--------+--------+
|     Business|      B1|  212410|
|     Business|      E1|    3743|
|     Business|      E2|   19383|
|     Business|     GMB|     150|
|     Business|       I|    3176|
|     Business|      I1|     234|
|     Business|      WB|  282983|
|     Pleasure|      B2| 1117897|
|     Pleasure|      CP|   14758|
|     Pleasure|     CPL|      10|
|     Pleasure|     GMT|   89133|
|     Pleasure|     SBP|      11|
|     Pleasure|      WT| 1309059|
|      Student|      F1|   39016|
|      Student|      F2|    2984|
|      Student|      M1|    1317|
|      Student|      M2|      49|
+-------------+--------+--------+



<br>

### <font color='red'>Check that departure_date > arrival_date</font>

In [57]:
spark.sql("""
    SELECT COUNT(*)
    FROM immigration5
    WHERE departure_date <= arrival_date
""").show()

[Stage 45:>                                                         (0 + 4) / 4]

+--------+
|count(1)|
+--------+
|     375|
+--------+





In [58]:
spark.sql("""
    SELECT arrival_date, departure_date, biryear, i94bir
    FROM immigration5
    WHERE departure_date <= arrival_date
""").show(10)

+------------+--------------+-------+------+
|arrival_date|departure_date|biryear|i94bir|
+------------+--------------+-------+------+
|  2016-04-30|    2016-04-29| 1989.0|  27.0|
|  2016-04-30|    2016-04-28| 1985.0|  31.0|
|  2016-04-30|    2016-04-29| 1979.0|  37.0|
|  2016-04-05|    2012-04-14| 1941.0|  75.0|
|  2016-04-05|    2016-03-14| 1948.0|  68.0|
|  2016-04-14|    2016-03-03| 1956.0|  60.0|
|  2016-04-01|    2016-02-28| 1966.0|  50.0|
|  2016-04-04|    2016-03-07| 2011.0|   5.0|
|  2016-04-01|    2016-03-05| 1975.0|  41.0|
|  2016-04-05|    2016-03-07| 1934.0|  82.0|
+------------+--------------+-------+------+
only showing top 10 rows



>- We can not fix these errors. 

In [59]:
# # >- Since the number of affected rows is relatively small, we'll simply drop the rows
# spark.sql("""
#     SELECT *
#     FROM immigration5
#     WHERE departure_date >= arrival_date
# """).createOrReplaceTempView("immigration6")

<br>

### check travel modes 
(1: 'Air' - 2: 'Sea' - 3: 'Land' - 9: 'Not reported')

In [60]:
spark.sql("""
    SELECT i94mode, count(*)
    FROM immigration5
    GROUP BY i94mode
""").show()

+-------+--------+
|i94mode|count(1)|
+-------+--------+
|   null|     239|
|    1.0| 2994505|
|    3.0|   66660|
|    2.0|   26349|
|    9.0|    8560|
+-------+--------+



In [61]:
# spark.sql("""
#     SELECT *, CASE 
#         WHEN i94mode = 1.0 THEN 'Air' 
#         WHEN i94mode = 2.0 THEN 'Sea'
#         WHEN i94mode = 3.0 THEN 'Land'
#         WHEN i94mode = 9.0 THEN 'Not Reported'
#         ELSE 'N/A' END AS travel_mode 
#     FROM immigration5
# """).createOrReplaceTempView("immigration6")

In [62]:
# USING SPARK SQL
# ----------------
sql_expr = """
CASE  WHEN i94mode = 1.0 THEN 'Air' 
      WHEN i94mode = 2.0 THEN 'Sea'
      WHEN i94mode = 3.0 THEN 'Land'
      WHEN i94mode = 9.0 THEN 'Not Reported'
      ELSE 'N/A'  
END              
"""

spark.sql("SELECT *," + sql_expr + "AS travel_mode FROM immigration5").createOrReplaceTempView("immigration6")


# OR USING SPARK DATAFRAME
# -------------------------
# https://sparkbyexamples.com/pyspark/pyspark-typeerror-column-is-not-iterable/
# TypeError: Column is not iterable

# # to modify i94mode in place
df_spark = df_spark.withColumn('i94mode', F.expr(sql_expr)) 
df_spark.select(col("i94mode").alias("travel_mode") ).show(3)

+-----------+
|travel_mode|
+-----------+
|        Air|
|        Air|
|        Air|
+-----------+
only showing top 3 rows



### <font color='red'>Fill missing age using birth year</font>

In [63]:
spark.sql("""
    SELECT COUNT(*)
    FROM immigration6
    WHERE i94bir IS NULL
""").show()

+--------+
|count(1)|
+--------+
|     802|
+--------+



In [64]:
spark.sql("""
    SELECT COUNT(biryear) 
    FROM immigration6 
    WHERE biryear IS NULL
""").show()

+--------------+
|count(biryear)|
+--------------+
|             0|
+--------------+



In [65]:
spark.sql("""
    SELECT (2016-biryear)-i94bir AS difference, count(*) 
    FROM immigration6 
    WHERE i94bir IS NOT NULL 
    GROUP BY difference
""").show()

+----------+--------+
|difference|count(1)|
+----------+--------+
|       0.0| 3095511|
+----------+--------+



In [66]:
# USING SPARK SQL
# ----------------
spark.sql("""
    SELECT *, (2016 - biryear) AS age                        
    FROM immigration6
""").createOrReplaceTempView("immigration7")

# spark.sql("""
#     SELECT *, CASE 
#         WHEN biryear IS NOT NULL THEN (2016 - biryear) 
#         ELSE 'N/A' END AS age                        
#     FROM immigration6
# """).createOrReplaceTempView("immigration7")


# OR USING SPARK DATAFRAME
# -------------------------
# https://sparkbyexamples.com/pyspark/pyspark-typeerror-column-is-not-iterable/
# TypeError: Column is not iterable

# # to modify depdate in place
df_spark = df_spark.withColumn('i94bir', F.expr("2016 - biryear")) 
df_spark.select(col("i94bir").alias("age") ).show(3)

+----+
| age|
+----+
|40.0|
|32.0|
|29.0|
+----+
only showing top 3 rows



In [67]:
spark.sql("""
    SELECT COUNT(age) As age_missing
    FROM immigration7 
    WHERE age IS NULL
""").show()

+-----------+
|age_missing|
+-----------+
|          0|
+-----------+



In [68]:
spark.sql("SELECT MAX(biryear), MIN(biryear) FROM immigration6 WHERE biryear IS NOT NULL").show()

+------------+------------+
|max(biryear)|min(biryear)|
+------------+------------+
|      2019.0|      1902.0|
+------------+------------+



>- this 2019 birth year is an error in the data
>- since this data is for 2016

In [69]:
spark.sql("""
    SELECT *
    FROM immigration7 
    WHERE biryear > 2016
""").toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,airline,admnum,fltno,visatype,arrival_date,departure_date,visa_end_date,visa_category,travel_mode,age
0,5952559.0,2016.0,4.0,252.0,209.0,AGA,20554.0,1.0,,,...,,57545310000.0,1,GMT,2016-04-10,,2016-05-24,Pleasure,Air,-3.0


In [70]:
# frequency of travellers by age
spark.sql("""
    SELECT age, COUNT(*)
    FROM immigration7 
    WHERE age IS NOT NULL
    GROUP BY age
    ORDER BY age DESC
""").show()

+-----+--------+
|  age|count(1)|
+-----+--------+
|114.0|       1|
|111.0|       1|
|110.0|       1|
|109.0|       2|
|108.0|       2|
|107.0|       1|
|105.0|       2|
|103.0|       1|
|102.0|       4|
|101.0|       2|
|100.0|      24|
| 99.0|      19|
| 98.0|      26|
| 97.0|      52|
| 96.0|      46|
| 95.0|      88|
| 94.0|     104|
| 93.0|     185|
| 92.0|     241|
| 91.0|     319|
+-----+--------+
only showing top 20 rows



In [71]:
# frequency of travellers by age
spark.sql("""
    SELECT age, COUNT(*)
    FROM immigration7 
    WHERE age IS NOT NULL
    GROUP BY age
    ORDER BY age 
""").show()

+----+--------+
| age|count(1)|
+----+--------+
|-3.0|       1|
| 0.0|     765|
| 1.0|   12747|
| 2.0|   14756|
| 3.0|   12704|
| 4.0|   14411|
| 5.0|   15129|
| 6.0|   15773|
| 7.0|   14233|
| 8.0|   14607|
| 9.0|   15368|
|10.0|   15745|
|11.0|   15971|
|12.0|   16840|
|13.0|   16490|
|14.0|   17435|
|15.0|   19401|
|16.0|   21153|
|17.0|   20326|
|18.0|   19117|
+----+--------+
only showing top 20 rows



#### Check columns to partition by

In [72]:
df_spark.groupBy(['i94cit', 'i94addr']).count().sort(desc('count')).show()

[Stage 76:>                                                         (0 + 4) / 4]

+------+-------+------+
|i94cit|i94addr| count|
+------+-------+------+
| 135.0|     FL|102647|
| 209.0|     HI| 97159|
| 111.0|     NY| 81776|
| 135.0|     NY| 72099|
| 689.0|     FL| 62244|
| 245.0|     CA| 57538|
| 687.0|     FL| 40878|
| 135.0|     CA| 39073|
| 254.0|     GU| 38901|
| 252.0|     GU| 38026|
| 582.0|     CA| 33941|
| 438.0|     CA| 33210|
| 148.0|     FL| 31517|
| 148.0|     NY| 30246|
| 111.0|     CA| 29768|
| 696.0|     FL| 29370|
| 582.0|     FL| 29272|
| 691.0|     FL| 27850|
| 582.0|     TX| 27773|
| 209.0|     CA| 27535|
+------+-------+------+
only showing top 20 rows



                                                                                

In [73]:
df_spark.groupBy(['i94cit']).count().sort(desc('count')).show()

+------+------+
|i94cit| count|
+------+------+
| 135.0|360157|
| 209.0|206873|
| 245.0|191425|
| 111.0|188766|
| 582.0|175781|
| 148.0|157806|
| 254.0|137735|
| 689.0|129833|
| 213.0|110691|
| 438.0|109884|
| 117.0| 78535|
| 123.0| 76920|
| 687.0| 69853|
| 129.0| 57224|
| 691.0| 54120|
| 130.0| 45269|
| 251.0| 41744|
| 692.0| 41349|
| 252.0| 41132|
| 696.0| 40785|
+------+------+
only showing top 20 rows



In [74]:
df_spark.groupBy(['i94addr']).count().sort(desc('count')).show()

+-------+------+
|i94addr| count|
+-------+------+
|     FL|621701|
|     NY|553677|
|     CA|470386|
|     HI|168764|
|   null|152592|
|     TX|134321|
|     NV|114609|
|     GU| 94107|
|     IL| 82126|
|     NJ| 76531|
|     MA| 70486|
|     WA| 55792|
|     GA| 44663|
|     MI| 32101|
|     VA| 31399|
|     PA| 30293|
|     DC| 28228|
|     NE| 26574|
|     MD| 25360|
|     NC| 23375|
+-------+------+
only showing top 20 rows



<br>

#### <font color='red'>Check for Duplicated Admission Number</font>

https://knowledge.udacity.com/questions/552714 <br>
**There are two factors affecting ADNUM.**
- ADNUM (Admission Number) is given to the individual every time he enters the country.
- An individual cannot enter the same country and have the same admission number two different times. You may want to check for duplicates.

In [75]:
with pd.option_context('display.max_columns', None):
    display(spark.sql("""
                SELECT *
                FROM immigration7
                WHERE admnum IN( 
                    SELECT admnum FROM (
                        SELECT admnum, COUNT(*) AS count
                        FROM immigration7
                        GROUP BY admnum
                        HAVING count > 1 
                        LIMIT 1) AS temp)
    """).toPandas()
      
    )

                                                                                

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype,arrival_date,departure_date,visa_end_date,visa_category,travel_mode,age
0,5754147.0,2016.0,4.0,254.0,276.0,TOR,20574.0,1.0,NY,20579.0,33.0,1.0,1.0,20160430,,,H,O,,M,1983.0,7222016,F,,RS,59171240000.0,7612,WB,2016-04-30,2016-05-05,2016-07-22,Business,Air,33.0
1,4547695.0,2016.0,4.0,254.0,276.0,NYC,20568.0,1.0,NY,20571.0,33.0,1.0,1.0,20160424,,,G,O,,M,1983.0,7222016,F,,KE,59171240000.0,81,WB,2016-04-24,2016-04-27,2016-07-22,Business,Air,33.0


In [76]:
spark.sql("""
            SELECT admnum, COUNT(*) AS count
            FROM immigration
            GROUP BY admnum
            HAVING count > 1 
            LIMIT 2
    """).toPandas()

                                                                                

Unnamed: 0,admnum,count
0,59171240000.0,2
1,59171150000.0,2


In [77]:
spark.sql("""
    SELECT COUNT(*) AS duplicated_admnum FROM (
        SELECT admnum, COUNT(*) AS count
        FROM immigration
        GROUP BY admnum
        HAVING count > 1    
    ) 
""").show()



+-----------------+
|duplicated_admnum|
+-----------------+
|            19374|
+-----------------+



                                                                                

<BR>

https://www23.statcan.gc.ca/imdb/p3VD.pl?Function=getVD&TVD=53971

In [78]:
spark.sql("""
    SELECT COUNT(DISTINCT(i94addr)) AS num_of_states
    FROM immigration
""").show()

+-------------+
|num_of_states|
+-------------+
|          457|
+-------------+



In [79]:
spark.sql("""
    SELECT *
    FROM immigration
    WHERE i94addr=='52'
""").toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,4712921.0,2016.0,4.0,148.0,112.0,CHI,20569.0,1.0,52,20573.0,...,,M,1974.0,7232016,,,UA,59247340000.0,906,WB


In [80]:
spark.sql("""
    SELECT i94addr, COUNT(i94addr) AS count
    FROM immigration
    GROUP BY i94addr
    ORDER BY count
""").show()

+-------+-----+
|i94addr|count|
+-------+-----+
|   null|    0|
|     06|    1|
|     OC|    1|
|     PW|    1|
|     CG|    1|
|     NF|    1|
|     EC|    1|
|     YH|    1|
|     N7|    1|
|     FC|    1|
|     RO|    1|
|     73|    1|
|     EV|    1|
|     EX|    1|
|     JS|    1|
|     PD|    1|
|     85|    1|
|     63|    1|
|     52|    1|
|     KF|    1|
+-------+-----+
only showing top 20 rows



In [81]:
spark.sql("""
    SELECT DISTINCT i94addr 
    FROM immigration
""").show()

+-------+
|i94addr|
+-------+
|     .N|
|     CI|
|     SC|
|     AZ|
|     IC|
|     PU|
|     UA|
|     EA|
|     NS|
|     KI|
|     RO|
|     PI|
|     LA|
|     NL|
|     MN|
|     BS|
|     11|
|     NK|
|     RE|
|     PL|
+-------+
only showing top 20 rows



<br>

In [82]:
# df_immigration = spark.sql("""SELECT * FROM immigration7""") 

<br>

<a id='temp'></a>
>#### 2- Temperature Data

<ul>
 <li><a href="#imm">1- Immigration Data Sample</a></li>
 <li><a href="#temp"><b>2- Temperature Data</b></a></li>
 <li><a href="#air">3- Airport Codes</a></li>
 <li><a href="#demo">4- US Cities Demographics</a></li>
 </ul>

- commercial air travel did't develop until after the second world war in the 1950s

In [83]:
# https://stackoverflow.com/questions/22646623/how-to-read-text-files-in-a-zipped-folder-in-python

zfile = ZipFile("input_data/data2.zip", 'r')       
zfile.namelist()

['data2/',
 '__MACOSX/._data2',
 'data2/GlobalLandTemperaturesByCountry.csv',
 '__MACOSX/data2/._GlobalLandTemperaturesByCountry.csv',
 'data2/GlobalLandTemperaturesByMajorCity.csv',
 '__MACOSX/data2/._GlobalLandTemperaturesByMajorCity.csv',
 'data2/GlobalLandTemperaturesByState.csv',
 '__MACOSX/data2/._GlobalLandTemperaturesByState.csv',
 'data2/GlobalTemperatures.csv',
 '__MACOSX/data2/._GlobalTemperatures.csv',
 'data2/GlobalLandTemperaturesByCity.csv',
 '__MACOSX/data2/._GlobalLandTemperaturesByCity.csv']

In [84]:
with ZipFile("input_data/data2.zip") as zipf:
    with zipf.open("data2/GlobalLandTemperaturesByCity.csv", "r") as f:
        df_temp = pd.read_csv(f)

df_temp['dt'] = pd.to_datetime(df_temp['dt'])
print(df_temp.shape)
df_temp.head()

(8599212, 7)


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [85]:
df_temp['dt'].min(), df_temp['dt'].max()

(Timestamp('1743-11-01 00:00:00'), Timestamp('2013-09-01 00:00:00'))

> No temperature data for 2016, so can not be joined with our immigration dataset.

In [86]:
df_temp.Country.nunique()

159

In [87]:
df_temp.City.nunique()

3448

In [88]:
df_temp.Country.value_counts().head()

India            1014906
China             827802
United States     687289
Brazil            475580
Russia            461234
Name: Country, dtype: int64

<br>

#### <font color='red'>Filter on US</font>

In [89]:
# df_temp_us = df_temp.query("Country == 'United States'")
df_temp_us = df_temp[df_temp.Country == 'United States'].copy(deep=True)
print(df_temp_us.shape)
df_temp_us.head()

(687289, 7)


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [90]:
df_temp_us.isnull().sum()

dt                                   0
AverageTemperature               25765
AverageTemperatureUncertainty    25765
City                                 0
Country                              0
Latitude                             0
Longitude                            0
dtype: int64

In [91]:
df_temp_us.City.nunique()

248

- If longitude < 0 then West, else East.
- If latitude < 0 then South, else North.

In [92]:
def change_format(x):
    if x.endswith('N'):
        x = float(x[:-1])
    elif x.endswith('E'):
        x = float(x[:-1])
    elif x.endswith('S'):
        x = - float(x[:-1])
    elif x.endswith('W'):
        x = - float(x[:-1])
    return x

In [93]:
df_temp_us['Latitude'] = df_temp_us['Latitude'].apply(change_format)
df_temp_us['Longitude'] = df_temp_us['Longitude'].apply(change_format)
df_temp_us.head(2)

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95,-100.53
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95,-100.53


<br>

#### <font color='red'>Drop Country Column</font>

In [94]:
df_temp_us.Country.value_counts()

United States    687289
Name: Country, dtype: int64

In [95]:
df_temp_us.drop(columns=['Country'], inplace=True)

<br>

#### <font color='red'>Map City to its State</font>
https://github.com/agalea91/city_to_state_dictionary/blob/master/US%20City%20to%20State%20dictionary.ipynb
(there are cities with the same name in different states)

<br>

In [96]:
with open('pickle/us_state_to_abbrev.pkl', 'rb') as f:
    us_state_to_abbrev = pickle.load(f)
    
# # invert the dictionary
# abbrev_to_us_state = dict(map(reversed, us_state_to_abbrev.items()))

In [97]:
# with open('pickle/city_to_state.pkl', 'wb') as f:
#     pickle.dump(city_to_state, f)
    
with open('pickle/city_to_state.pkl', 'rb') as f:
    city_to_state = pickle.load(f)

In [98]:
# city_to_st = {}
# for key, value in city_to_state.items():
#     try:
#         city_to_st[key] = us_state_to_abbrev[value]
#     except:
#         pass

In [99]:
# with open('pickle/city_to_st.pkl', 'wb') as f:
#     pickle.dump(city_to_st, f)
    
with open('pickle/city_to_st.pkl', 'rb') as f:
    city_to_st = pickle.load(f)

In [100]:
def get_st(city):
    try:
        st = city_to_st[city]
    except:
        st = ''
    return st

In [101]:
# /opt/conda/lib/python3.6/site-packages/ipykernel_launcher.py:1: SettingWithCopyWarning: 
# A value is trying to be set on a copy of a slice from a DataFrame.
# Try using .loc[row_indexer,col_indexer] = value instead

# The above error raised due to the created view in "Filter on US"

df_temp_us['State'] = df_temp_us['City'].apply(lambda x: get_st(x))
df_temp_us.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Latitude,Longitude,State
47555,1820-01-01,2.101,3.217,Abilene,32.95,-100.53,TX
47556,1820-02-01,6.926,2.853,Abilene,32.95,-100.53,TX
47557,1820-03-01,10.767,2.395,Abilene,32.95,-100.53,TX
47558,1820-04-01,17.989,2.202,Abilene,32.95,-100.53,TX
47559,1820-05-01,21.809,2.036,Abilene,32.95,-100.53,TX


<br>

#### <font color='red'>Handle Duplicated Cities</font>

In [102]:
with open('pickle/duplicated_cities.pkl', 'rb') as f:
    duplicated_cities = pickle.load(f)
    
len(duplicated_cities)

293

In [103]:
duplicated_df = df_temp_us[df_temp_us.City.isin(duplicated_cities)].groupby(['City', 'Latitude', 'Longitude'])['dt']\
.count().reset_index(name='count')

total_count = duplicated_df['count'].sum()
print(f'We have {duplicated_df.shape[0]} duplicated cities in the data with {total_count} Counts')
duplicated_df.head(2)

We have 77 duplicated cities in the data with 210454 Counts


Unnamed: 0,City,Latitude,Longitude,count
0,Alexandria,39.38,-76.99,3239
1,Arlington,32.95,-96.7,2325


https://gis.stackexchange.com/questions/372872/max-retries-exceeded-with-url-in-nominatim-with-geopy <br>
https://stackoverflow.com/questions/28667684/python-requests-getting-sslerror

**Comment the try - except block out to see the error**

In [104]:
import requests

def get_states(longs, latts):
    ''' Input two 1D lists of floats/ints '''
    # a list of states
    states = []
 
    for lon, lat in zip(longs, latts):
        url = f'https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json&accept-language=en'
        try:
            result = requests.get(url=url, verify=False)
            result_json = result.json()
#             print(result_json)
            # get the state name
            state = result_json['address']['state']
            st = us_state_to_abbrev[state]
        except:
            # return empty string
            st = ''
        states.append(st)
    return states

In [105]:
import warnings
warnings.filterwarnings('ignore')
get_states([-77.05803, -86.95444], [38.73289, 33.40178])

['VA', 'AL']

In [106]:
duplicated_df['State'] = get_states(duplicated_df.Longitude, duplicated_df.Latitude)

In [107]:
# duplicated_df['State'] = duplicated_df.apply(lambda x: get_states(x.Longitude, x.Latitude), axis=1)
duplicated_df.head(2)

Unnamed: 0,City,Latitude,Longitude,count,State
0,Alexandria,39.38,-76.99,3239,MD
1,Arlington,32.95,-96.7,2325,TX


In [108]:
# Note: 4 of the cities have the same Latitude and Longitude
# By Checking on Google Maps 32.95, -117.77 --> North Pacific Ocean --> near San Diego, California
# By Checking on Google Maps 26.52, -96.72 --> Gulf of Mexico --> near Texas

# Brownsville --> Texas
# Corona --> California
# Long Beach --> California
# Oceanside --> California
# Brownsville --> California

cities = ['Brownsville', 'Corona', 'Long Beach', 'Oceanside', 'Orange']
for city in cities:
    print(city, '-->', city_to_st[city])

duplicated_df[duplicated_df['State'] == '']

Brownsville --> NY
Corona --> CA
Long Beach --> CA
Oceanside --> CA
Orange --> CA


Unnamed: 0,City,Latitude,Longitude,count,State
10,Brownsville,26.52,-96.72,2289,
22,Corona,32.95,-117.77,1977,
44,Long Beach,32.95,-117.77,1977,
54,Oceanside,32.95,-117.77,1977,
55,Orange,32.95,-117.77,1977,


In [109]:
# Setting these missing cities  manually

mask = (duplicated_df.Latitude==32.95) & (duplicated_df.Longitude==-117.77)
duplicated_df.loc[mask , 'State'] = "CA"

In [110]:
mask = (duplicated_df.Latitude==26.52) & (duplicated_df.Longitude==-96.72)
duplicated_df.loc[mask , 'State'] = "TX"

In [111]:
duplicated_df[duplicated_df['City'].isin(cities)]

Unnamed: 0,City,Latitude,Longitude,count,State
10,Brownsville,26.52,-96.72,2289,TX
22,Corona,32.95,-117.77,1977,CA
44,Long Beach,32.95,-117.77,1977,CA
54,Oceanside,32.95,-117.77,1977,CA
55,Orange,32.95,-117.77,1977,CA


In [112]:
duplicated_df.drop(columns=['count'], inplace=True)
duplicated_df.head(1)

Unnamed: 0,City,Latitude,Longitude,State
0,Alexandria,39.38,-76.99,MD


In [113]:
# # Merge both dataframes and overwrite states
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.update.html

# Should have at least one matching index/column label with the original DataFrame. 
# If a Series is passed, its name attribute must be set, and that will be used as the column name 
# to align with the original DataFrame.

# We must set at least one matching index between both dataframes
df_temp_us = df_temp_us.set_index(['City', 'Latitude', 'Longitude'])
duplicated_df = duplicated_df.set_index(['City', 'Latitude', 'Longitude'])

# without City in the index --> Exception: cannot handle a non-unique multi-index!


df_temp_us.update(duplicated_df)

# # https://stackoverflow.com/questions/9787853/join-or-merge-with-overwrite-in-pandas
# duplicated_df.combine_first(df_temp_us) # Did not work

In [114]:
df_temp_us.head(2)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,dt,AverageTemperature,AverageTemperatureUncertainty,State
City,Latitude,Longitude,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
Abilene,32.95,-100.53,1820-01-01,2.101,3.217,TX
Abilene,32.95,-100.53,1820-02-01,6.926,2.853,TX


In [115]:
df_temp_us.reset_index(inplace=True)

In [116]:
mask = (df_temp_us.Latitude==26.52) & (df_temp_us.Longitude==-96.72)
df_temp_us[mask].head(2)

Unnamed: 0,City,Latitude,Longitude,dt,AverageTemperature,AverageTemperatureUncertainty,State
70199,Brownsville,26.52,-96.72,1823-01-01,16.028,2.767,TX
70200,Brownsville,26.52,-96.72,1823-02-01,15.151,3.087,TX


<br>

#### <font color='red'>Handle Missing States</font>

In [117]:
df_temp_us[df_temp_us.State == ''].City.value_counts()

Nuevo Laredo    2289
Name: City, dtype: int64

In [118]:
df_temp_us[df_temp_us.State == ''].City.value_counts()

Nuevo Laredo    2289
Name: City, dtype: int64

<br>

**'Nuevo Laredo': 'Tamaulipas'** <br>
The city of Laredo is situated in the U.S. state of Texas on the northern bank of the Rio Grande and Nuevo Laredo is located in the Mexican State of Tamaulipas in the southern bank of the river. This area is also known as the Two Laredos or the Laredo Borderplex.

<br>

#### <font color='red'>Filter out 'Nuevo Laredo' Mexican City</font>

In [119]:
df_temp_us = df_temp_us[~(df_temp_us.City == 'Nuevo Laredo')].copy(deep=True)
print(df_temp_us.shape)
df_temp_us.head()

(685000, 7)


Unnamed: 0,City,Latitude,Longitude,dt,AverageTemperature,AverageTemperatureUncertainty,State
0,Abilene,32.95,-100.53,1820-01-01,2.101,3.217,TX
1,Abilene,32.95,-100.53,1820-02-01,6.926,2.853,TX
2,Abilene,32.95,-100.53,1820-03-01,10.767,2.395,TX
3,Abilene,32.95,-100.53,1820-04-01,17.989,2.202,TX
4,Abilene,32.95,-100.53,1820-05-01,21.809,2.036,TX


<br>

<br>

#### <font color='red'>Groupby dt and State and average</font>

In [120]:
df_temp_st = df_temp_us.groupby(['dt', 'State'])[['AverageTemperature', 'AverageTemperatureUncertainty']].mean().reset_index()
print(df_temp_st.shape)
df_temp_st.head()

(119056, 4)


Unnamed: 0,dt,State,AverageTemperature,AverageTemperatureUncertainty
0,1743-11-01,AL,9.97475,2.32875
1,1743-11-01,CT,4.668667,1.607
2,1743-11-01,FL,17.543857,2.351714
3,1743-11-01,GA,11.647,2.28
4,1743-11-01,IA,1.6675,2.5275


<br>

#### <font color='red'>Or Load GlobalLandTemperaturesByState</font>

In [128]:
zfile = ZipFile("input_data/data2.zip", 'r')       
zfile.namelist()

['data2/',
 '__MACOSX/._data2',
 'data2/GlobalLandTemperaturesByCountry.csv',
 '__MACOSX/data2/._GlobalLandTemperaturesByCountry.csv',
 'data2/GlobalLandTemperaturesByMajorCity.csv',
 '__MACOSX/data2/._GlobalLandTemperaturesByMajorCity.csv',
 'data2/GlobalLandTemperaturesByState.csv',
 '__MACOSX/data2/._GlobalLandTemperaturesByState.csv',
 'data2/GlobalTemperatures.csv',
 '__MACOSX/data2/._GlobalTemperatures.csv',
 'data2/GlobalLandTemperaturesByCity.csv',
 '__MACOSX/data2/._GlobalLandTemperaturesByCity.csv']

In [134]:
# from io import StringIO
# df_temperature = pd.read_csv(StringIO(f.read().decode()), lineterminator='\n')

with ZipFile("input_data/data2.zip") as zipf:
    with zipf.open("data2/GlobalLandTemperaturesByState.csv", "r") as f:
        df_temperature = pd.read_csv(f)
        
df_temp_usa = df_temperature[df_temperature.Country == 'United States'].copy(deep=True)
print(df_temp_usa.shape)
df_temp_usa.head()

(149745, 5)


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,State,Country
7458,1743-11-01,10.722,2.898,Alabama,United States
7459,1743-12-01,,,Alabama,United States
7460,1744-01-01,,,Alabama,United States
7461,1744-02-01,,,Alabama,United States
7462,1744-03-01,,,Alabama,United States


<a id='air'></a>
>#### 3- Airport Codes

<ul>
 <li><a href="#imm">1- Immigration Data Sample</a></li>
 <li><a href="#temp">2- Temperature Data</a></li>
 <li><a href="#air"><b>3- Airport Codes</b></a></li>
 <li><a href="#demo">4- US Cities Demographics</a></li>
 </ul>

**U.S. States**
- 'PA': 'PENNSYLVANIA'
- 'KS': 'KANSAS'

| - | Col | Description |
|-|-|-|
|1|ident| identification code |
|2|type| type of airport (7 types) <br> (small_airport - medium_airport - large_airport - heliport - seaplane_base - balloonport - closed)|
|3|name| Airport Name|
|4|elevation_ft| Airport elevation in ft |
|5|continent||
|6|iso_country||
|7|iso_region| US State |
|8|municipality| US City <br> A town or district that has local government <br> A municipal airport is an airport owned by a city or municipality. <br> البلدية / المجلس المحلي |
|9|gps_code||
|10|iata_code| International Air Transport Association airport code |
|11|local_code||
|12|coordinates||

An IATA airport code, also known as an IATA location identifier, IATA station code, or simply a location identifier, is a three-character alphanumeric geocode designating many airports and metropolitan areas around the world, defined by the International Air Transport Association.

In [135]:
df_port = pd.read_csv('input_data/airport-codes_csv.csv')
print(df_port.shape)
df_port.head(1)

(55075, 12)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"


In [136]:
df_port.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         55075 non-null  object 
 1   type          55075 non-null  object 
 2   name          55075 non-null  object 
 3   elevation_ft  48069 non-null  float64
 4   continent     27356 non-null  object 
 5   iso_country   54828 non-null  object 
 6   iso_region    55075 non-null  object 
 7   municipality  49399 non-null  object 
 8   gps_code      41030 non-null  object 
 9   iata_code     9189 non-null   object 
 10  local_code    28686 non-null  object 
 11  coordinates   55075 non-null  object 
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [137]:
df_port.duplicated().sum()

0

In [138]:
df_port.iso_country.nunique()

243

#### <font color='red'>Filter on US</font>

In [139]:
# Before we do that, let's make sure there is no missing data in the iso_county field.
df_port[df_port['iso_country'].isna()].shape

(247, 12)

In [140]:
df_port[df_port['iso_country'].isna()]['continent'].value_counts()

AF    247
Name: continent, dtype: int64

> - All the missing airports data are based in Africa.

In [141]:
df_port_us = df_port[df_port.iso_country == 'US'].copy(deep=True)
print(df_port_us.shape)
df_port_us.head(2)

(22757, 12)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"


In [142]:
df_port_us.municipality.value_counts().head() # City

Houston        119
Wasilla         68
Los Angeles     66
Springfield     49
Columbus        48
Name: municipality, dtype: int64

<br>

#### <font color='red'>Exclude 'closed', 'heliport', 'seaplane_base', 'balloonport' Types</font>

In [143]:
df_port_us.type.value_counts()

small_airport     13720
heliport           6265
closed             1326
medium_airport      692
seaplane_base       566
large_airport       170
balloonport          18
Name: type, dtype: int64

In [144]:
# No immigration data is collected from balloonports, seaplane bases or heliport 
# since these means of transportation are used for recreational purposes or very short distances

exclude = ['closed', 'heliport', 'seaplane_base', 'balloonport']
df_port_us = df_port_us[~df_port_us['type'].isin(exclude)].copy()
print(df_port_us.shape)
df_port_us.head(2)

(14582, 12)


Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"


In [145]:
df_port_us.type.value_counts()

small_airport     13720
medium_airport      692
large_airport       170
Name: type, dtype: int64

In [146]:
df_port_us.isnull().sum()

ident               0
type                0
name                0
elevation_ft       63
continent       14582
iso_country         0
iso_region          0
municipality       50
gps_code          399
iata_code       12717
local_code        199
coordinates         0
dtype: int64

<br>

In [147]:
print(f"{df_port_us.iso_region.nunique()} States")

52 States


In [148]:
df_port_us[df_port_us['municipality'] == 'Abilene']

# Only one large airport

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
3937,35TX,small_airport,Flying B Ranch Airstrip,1750.0,,US,US-TX,Abilene,35TX,,35TX,"-99.82559967041016, 32.48400115966797"
7575,6TE2,small_airport,Zimmerle Airport,2057.0,,US,US-TX,Abilene,6TE2,,6TE2,"-99.597900390625, 32.270401000977"
8704,82TS,small_airport,Elmdale Airpark,1775.0,,US,US-TX,Abilene,82TS,,82TS,"-99.650398254395, 32.450099945068"
26006,KABI,medium_airport,Abilene Regional Airport,1791.0,,US,US-TX,Abilene,KABI,ABI,ABI,"-99.68190002440001, 32.4113006592"
26636,KDYS,large_airport,Dyess Air Force Base,1789.0,,US,US-TX,Abilene,KDYS,DYS,DYS,"-99.854598999, 32.4207992554"
27738,KK78,small_airport,Abilene Municipal Airport,1152.0,,US,US-KS,Abilene,K78,,K78,"-97.235900878906, 38.904098510742"
48147,TX00,small_airport,Abilene Executive Airpark,1822.0,,US,US-TX,Abilene,TX00,,TX00,"-99.62000274658203, 32.44889831542969"
48149,TX02,small_airport,Portlock Airfield,1780.0,,US,US-TX,Abilene,TX02,,TX02,"-99.61289978027344, 32.5265007019043"


<br>

In [149]:
df_port_us_l = df_port_us[df_port_us['type'].isin(['large_airport', 'medium_airport'])].copy(deep=True)
df_port_us_l.dropna(subset=['name', 'local_code'], inplace=True)

print(df_port_us_l.shape)
(df_port_us_l['iata_code'] == df_port_us_l['local_code']).sum()

(853, 12)


784

> iata_code = local_code for all large airports

In [150]:
df_port_us_l[df_port_us_l['iata_code'] != df_port_us_l['local_code']].head()

# # NaNs != NaNs --> We dropped out NaNs

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
6188,5A8,medium_airport,Aleknagik / New Airport,66.0,,US,US-AK,Aleknagik,5A8,WKK,5A8,"-158.617996216, 59.2826004028"
25825,K79J,medium_airport,South Alabama Regional At Bill Benton Field Ai...,310.0,,US,US-AL,Andalusia/Opp,K79J,,79J,"-86.393799, 31.3088"
26067,KAKR,medium_airport,Akron Fulton International Airport,1067.0,,US,US-OH,Akron,KAKR,AKC,AKR,"-81.4669036865, 41.0374984741"
26168,KBAK,medium_airport,Columbus Municipal Airport,656.0,,US,US-IN,Columbus,KBAK,CLU,BAK,"-85.8963012695, 39.2619018555"
26227,KBKF,medium_airport,Buckley Air Force Base,5662.0,,US,US-CO,Aurora,KBKF,BFK,BKF,"-104.751998901, 39.701698303200004"


In [151]:
df_port_us_l.municipality.value_counts().head(10)

Columbus        7
Jacksonville    6
Houston         5
Jackson         5
Atlanta         4
Las Vegas       4
Portland        4
Sacramento      4
Greenville      4
San Diego       4
Name: municipality, dtype: int64

<br>

#### <font color='red'>Remove US prefix from states</font>

In [152]:
df_port_us['iso_region'] = df_port_us['iso_region'].apply(lambda x: x[3:])
df_port_us.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
5,00AS,small_airport,Fulton Airport,1100.0,,US,OK,Alex,00AS,,00AS,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,AZ,Cordes,00AZ,,00AZ,"-112.16500091552734, 34.305599212646484"


In [153]:
df_port_us.continent.isnull().sum() # All are NaNs except for 1 value, but we can fill all with "NA"

14582

In [154]:
df_port_us.continent.value_counts()

Series([], Name: continent, dtype: int64)

In [155]:
df_port_us[df_port_us.continent == 'AS'] # This is Wrong

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates


|Code|Continent name|
|-|-|
|AF|Africa|
|AN|Antarctica|
|AS|Asia|
|EU|Europe|
|NA|North america|
|OC|Oceania|
|SA|South america|

> What is the difference between ident, gps_code and local_code?!!

In [156]:
df_port_us['ident'].isnull().sum()

0

In [157]:
df_port_us['gps_code'].isnull().sum()

399

In [158]:
df_port_us['local_code'].isnull().sum()

199

In [159]:
(df_port_us['ident'] != df_port_us['local_code']).sum()

4523

In [160]:
(df_port_us['ident'] != df_port_us['gps_code']).sum()

1074

In [161]:
mask = (df_port_us['local_code'] != df_port_us['ident']) & (df_port_us['local_code'].notnull())
df_port_us[mask].head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
221,03MT,small_airport,Cascade Field,3580.0,,US,MT,Cascade,3MT7,,3MT7,"-111.71748, 47.267327"
580,09TA,small_airport,Lazy G Bar Ranch Airport,923.0,,US,TX,Decatur,,,09T,"-97.497002, 33.282101"
636,0C7,small_airport,Grandpas' Farm Mendota Airport,727.0,,US,IL,Mendota,IL22,,IL22,"-89.132599, 41.521999"
693,0D9,small_airport,Air Park North,1170.0,,US,MI,Alba,MI30,,MI30,"-84.9587, 44.958"
813,0L5,small_airport,Goldfield Airport,5680.0,,US,NV,Goldfield,NV50,,NV50,"-117.236368, 37.722751"


In [162]:
df_port_us.local_code.value_counts().head()

5II5    2
LA36    2
46XS    2
1TS9    2
CVC     2
Name: local_code, dtype: int64

In [163]:
df_port_us.ident.value_counts().head()

00AA    1
KMO1    1
KMKV    1
KMKY    1
KMLB    1
Name: ident, dtype: int64

<br>

In [164]:
df_port_us.iata_code.isnull().sum() # almost all values are missing!!

12717

In [165]:
df_port_us.iata_code.value_counts().head()

OCA    1
PMH    1
POY    1
POU    1
POF    1
Name: iata_code, dtype: int64

<br>

<a id='demo'></a>
>#### 4- US Cities Demographics

<ul>
 <li><a href="#imm">1- Immigration Data Sample</a></li>
 <li><a href="#temp">2- Temperature Data</a></li>
 <li><a href="#air">3- Airport Codes</a></li>
 <li><a href="#demo"><b>4- US Cities Demographics</b></a></li>
 </ul>
 
 
- https://knowledge.udacity.com/questions/552714 
- https://www.pewresearch.org/social-trends/2011/01/12/state-population-estimates-and-census-2010-counts-did-they-match/

- The Census Bureau's Population Estimates Program (PEP) produces estimates of the population for the United States, states, metropolitan and micropolitan statistical areas, counties, cities, towns, as well as for Puerto Rico and its municipios.
- Census population statistics cover age, sex, race, Hispanic origin, migration, ancestry, language use, veterans, as well as population estimates and projections.
- Unlike the census, which counts people directly, the State Population Estimates are assembled using government data, including birth and death certificates, immigration estimates and tax-return statistics on people who changed residences.
- it greatly increases confidence in the census count if population numbers that are derived using different methods are similar.

In [166]:
df_demo = pd.read_csv('input_data/us-cities-demographics.csv', delimiter=';')
print(df_demo.shape)
df_demo.head(2)

(2891, 12)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723


In [167]:
df_demo.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

In [168]:
print(f"{df_demo.City.nunique()} Cities, {df_demo['State Code'].nunique()} States")

567 Cities, 49 States


In [169]:
df_demo[['City', 'State']].duplicated().sum()

2295

In [170]:
df_demo[df_demo['City']=='Quincy'] # Count is ditributed by Race

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
289,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Hispanic or Latino,2566
426,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,American Indian and Alaska Native,351
2322,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Black or African-American,3917
2578,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,Asian,30473


> **Note:**
>- Unlike Total Population, Count is ditributed by Race

In [171]:
df_demo.City.value_counts().head()

Bloomington    15
Columbia       15
Springfield    15
Jackson        10
Norwalk        10
Name: City, dtype: int64

In [172]:
df_demo.query("City == 'Quincy'").groupby('City')['Count'].sum()

City
Quincy    96030
Name: Count, dtype: int64

<br>

#### <font color='red'>Remove Race and Count & Drop Duplicated</font>

In [173]:
df_demo.drop(columns=['Race', 'Count'], inplace=True)
print(df_demo.shape)
df_demo.head(2)

(2891, 10)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA


In [174]:
df_demo.drop_duplicates(inplace=True)
print(df_demo.shape)
df_demo.head(2)

(596, 10)


Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA


In [175]:
df_demo[['City', 'State']].duplicated().sum()

0

<br>

#### <font color='red'>Merge with "Airport Codes" to get local airport code</font>

In [176]:
print(df_demo.shape)
df_port_us_l.shape

(596, 10)


(853, 12)

> The same city has many airports

In [177]:
# df_demo2 = pd.merge(df_demo, df_port_us_l[['municipality', 'name', 'local_code']], how='left', 
#                     left_on='City', right_on='municipality')

# df_demo2 = df_demo2.sort_values('City')
# print(df_demo2.shape)
# df_demo2.head(10)

<br>

#### <font color='red'>GroupBy State</font>

In [178]:
df_demo_st = df_demo.groupby(['State', 'State Code'], as_index=False).agg({'Median Age': 'mean','Male Population': 'sum', 
                    'Female Population': 'sum', 'Total Population': 'sum', 'Number of Veterans': 'sum',
                    'Foreign-born': 'sum', 'Average Household Size': 'mean'})
print(df_demo_st.shape)
df_demo_st.head(2)

(49, 9)


Unnamed: 0,State,State Code,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size
0,Alabama,AL,36.228571,497248.0,552381.0,1049629,71543.0,52154.0,2.434286
1,Alaska,AK,32.2,152945.0,145750.0,298695,27492.0,33258.0,2.77


In [179]:
# df_demo_st.to_csv("input_data/us-states-demographics.csv", index=False)

<br>

#### <font color='red'>Create an S3 bucket and save cleaned data in</font>
https://stackoverflow.com/questions/38154040/save-dataframe-to-csv-directly-to-s3-python/56275519#56275519

- Bucket name: nagy99
    - Bucket name must be globally unique and must not contain spaces or uppercase letters. 
- AWS Region: us-west-2

In [180]:
# aws_credentials = { "key": "***", "secret": "***", "token": "***" } 
# df.to_csv("s3://nagy99/example.csv", index=False, storage_options=aws_credentials)

<br>

<a id='step3'></a>
### Step 3: Define the Data Model
<ul>
 <li><a href="#intro">Project Summary</a></li>
 <li><a href="#step1">Step 1: Scope the Project and Gather Data</a></li>
 <li><a href="#step2">Step 2: Explore and Assess the Data</a></li>
 <li><a href="#step3"><b>Step 3: Define the Data Model</b></a></li>
 <li><a href="#step4">Step 4: Run ETL to Model the Data</a></li>
 <li><a href="#step5">Step 5: Complete Project Write Up</a></li>
 </ul>
 
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

We will use a "Star Schema" Model <br>
**Advantages of Star Schema:** 
- **Simpler Queries** <br>
Join logic of star schema is quite cinch in comparison to other join logic which are needed to fetch data from a transactional schema that is highly normalized.
- **Simplified Business Reporting Logic** <br>
In comparison to a transactional schema that is highly normalized, the star schema makes simpler common business reporting logic, such as of reporting and period-over-period.
- **Feeding Cubes** <br>
Star schema is widely used by all OLAP systems to design OLAP cubes efficiently. In fact, major OLAP systems deliver a ROLAP mode of operation which can use a star schema as a source without designing a cube structure.

### Fact Table
Since we're interested in the flow of travellers through the united states. The i94 data will serve as our fact table. 

- 1. immigration 

| - | Col | Description|
| --- | ---: | :---| 
|1|cicid|Application number / Citizenship and Immigration C...|
|**2**|**arrival_year**|**Arrival Year**|
|**3**|**arrival_month**|**Arrival Month**|
|4|citizinship|Country Immigrant is Originally From (country of citizernship)|
|5|residence|Country of Immigrant Residence|
|6|port|AIR / SEAPORT of entry into the US<br> ('XXX': 'NOT REPORTED/UNKNOWN' - '888': 'UNIDENTIFED AIR / SEAPORT' -'UNK': 'UNKNOWN POE')|
|**7**|**arrival_date**|**Arrival Date to USA**|
|8|travel_mode| (1: 'Air' - 2: 'Sea' - 3: 'Land' -  9: 'Not reported') |
|9|us_state|U.S. State / Address of Immigrant Inside USA <br> ('99'='All Other Codes') <br> actually representing the final address of the migrants, that is where they currently live in the US.|
|**10**|**departure_date**|**Departure Date from the USA**|
|**11**|**age**|**Age of Respondent in Years**|
|12|visa_category|Visa codes collapsed into three categories <br> (Business - Pleasure - Student)|
|13|dep_issued_visa|Department of State where where Visa was issued - CIC does not use <br> This is where your visa was issued. It will be a U.S. embassy or U.S. consulate.|
|**14**|**visa_expiration_date**|**Character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use <br>  visa expiration date  <br>**|
|15|gender|Non-immigrant sex|
|16|airline|Airline used to arrive in U.S.|
|17|admission_number|Admission Number - An 11-digit number assigned to an alien when he enters the Unites States.|
|18|flight_number|Flight number of Airline used to arrive in U.S.|
|19|visa_type|VISATYPE - Class of admission legally admitting the non-immigrant to temporarily stay in U.S.|


  
### Dimension Tables

- 2. date - to aggregate the data suing various time units <br>
     |-- arrdate: date (nullable = true) <br>
     |-- arrival_day: integer (nullable = true) <br>
     |-- arrival_week: integer (nullable = true) <br>
     |-- arrival_month: integer (nullable = true) <br>
     |-- arrival_year: integer (nullable = true) <br>
     |-- arrival_weekday: integer (nullable = true) <br>
<br> <br> 
- 3. demographics - To look at the demographic data of the areas with the most travelers <br>
     |-- City: string (nullable = true) <br>
     |-- State: string (nullable = true) <br>
     |-- median_age: double (nullable = true) <br>
     |-- male_population: integer (nullable = true) <br>
     |-- female_population: integer (nullable = true) <br>
     |-- total_population: integer (nullable = true) <br>
     |-- n_veterans: integer (nullable = true) <br>
     |-- foreign_born: integer (nullable = true) <br>
     |-- avg_household_size: double (nullable = true) <br>
     |-- state_code: string (nullable = true) <br>
     |-- Race: string (nullable = true) <br>
     |-- Count: integer (nullable = true) <br>

<br>

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

**Data Extraction:**
- Load all the datasets from CSV and SAS data files;

**Data Transformation and Loading:**

**fact_immigration:**
- Convert Dates (sas / string) to DateTime
- Add Visa Categories (1=Business - 2=Pleasure - 3=Student)
- Add travel modes (1=Air - 2=Sea - 3=Land - 9=Not Reported)
- Write to parquet

**dim_time:**
- Get all the arrival dates from the immigration data_set;
- extract year, month, day, week from the date and insert all the values in the dim_time table;
- Write to parquet

**dim_city_demographics:**
- Rename Columns
- Write to parquet

<br>

<a id='step4'></a>
### Step 4: Run Pipelines to Model the Data 
<ul>
 <li><a href="#intro">Project Summary</a></li>
 <li><a href="#step1">Step 1: Scope the Project and Gather Data</a></li>
 <li><a href="#step2">Step 2: Explore and Assess the Data</a></li>
 <li><a href="#step3">Step 3: Define the Data Model</a></li>
 <li><a href="#step4"><b>Step 4: Run ETL to Model the Data</b></a></li>
 <li><a href="#step5">Step 5: Complete Project Write Up</a></li>
 </ul>
 
<a id='model'></a>
#### 4.1 Create the data model
Build the data pipelines to create the data model.
<ul>
 <li><a href="#model"><b>4.1 Create the data model</b></a></li>
 <li><a href="#check">4.2 Data Quality Checks</a></li>
 </ul>

https://knowledge.udacity.com/questions/566319

- If your scope aligns with the idea of providing analytics for specific use case, creating a data warehouse is the right approach. Data lake is a storage space for both structured and unstructured raw data. 
- Whereas, ETL is performed to create a structured data for the type of analysis viz (data warehouse object). 

In [181]:
# - %run -i '.py'
# - !python .py

t0 = time()
!python spark_etl.py
print(f"Duration: {(time()-t0)/60} Minutes")

### (Using S3 with Local Mode!!)
### This is not recommended at all in production
### This is going to load the data all the way from amazon,
### and into the memory of jupyter Notebook Machine

22/11/22 04:51:38 WARN Utils: Your hostname, Mahmouds-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.3 instead (on interface en0)
22/11/22 04:51:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
https://repos.spark-packages.org/ added as a remote repository with the name: repo-1
:: loading settings :: url = jar:file:/Users/mnagy99/opt/anaconda3/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/mnagy99/.ivy2/cache
The jars for the packages stored in: /Users/mnagy99/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f55bfbac-c19e-4519-93f7-a5037024f1a7;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;2.7.0 in central
	found org.apache.hadoop#hadoop-common;2.7.0 in central
	found org.apache.hadoop#hadoop-annotations;2.7.0 in central
	found com.google.guava#guava;11.0.2 

	0 artifacts copied, 68 already retrieved (0kB/26ms)
22/11/22 04:51:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/22 04:51:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/11/22 04:51:57 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Duration: 0.9164968490600586 Minutes                                            


<br>

In [182]:
# get filepath to immigration data file
path = "output_data/immigration"
    
# read song data file
df_immigration = spark.read\
          .parquet(path)

print(df_immigration.count())
df_immigration.printSchema()

with pd.option_context('display.max_columns', None):
    display(df_immigration.limit(5).toPandas())

                                                                                

3096313
root
 |-- cicid: double (nullable = true)
 |-- arrival_year: double (nullable = true)
 |-- arrival_month: double (nullable = true)
 |-- citizinship: double (nullable = true)
 |-- residence: double (nullable = true)
 |-- port: string (nullable = true)
 |-- arrival_date: date (nullable = true)
 |-- travel_mode: string (nullable = true)
 |-- departure_date: date (nullable = true)
 |-- age: double (nullable = true)
 |-- visa_category: string (nullable = true)
 |-- dep_issued_visa: string (nullable = true)
 |-- visa_expiration_date: date (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admission_number: double (nullable = true)
 |-- flight_number: string (nullable = true)
 |-- visa_type: string (nullable = true)
 |-- us_state: string (nullable = true)



Unnamed: 0,cicid,arrival_year,arrival_month,citizinship,residence,port,arrival_date,travel_mode,departure_date,age,visa_category,dep_issued_visa,visa_expiration_date,gender,airline,admission_number,flight_number,visa_type,us_state
0,1172235.0,2016.0,4.0,129.0,129.0,ORL,2016-04-07,Air,2016-04-09,31.0,Pleasure,,2016-07-05,F,CM,55739920000.0,434,WT,FL
1,1409781.0,2016.0,4.0,245.0,245.0,SFR,2016-04-08,Air,2016-04-13,36.0,Business,GUZ,2016-10-06,M,CX,92999300000.0,872,B1,FL
2,1172236.0,2016.0,4.0,129.0,129.0,ORL,2016-04-07,Air,2016-04-09,41.0,Business,,2016-07-05,F,B6,55777950000.0,1714,WB,FL
3,1409853.0,2016.0,4.0,245.0,245.0,HAM,2016-04-08,Air,2016-04-09,23.0,Business,GUZ,2016-10-07,M,DL,93024730000.0,657,B1,FL
4,1172237.0,2016.0,4.0,129.0,129.0,ORL,2016-04-07,Air,2016-04-10,49.0,Pleasure,,2016-07-05,F,B6,55744260000.0,1724,WT,FL


<br>

In [183]:
# create initial date df from arrdate column
date_df = df_spark.select(['arrdate']).distinct()

# expand df by adding other calendar columns
date_df = date_df.withColumn('arrival_day', F.dayofmonth('arrdate'))
date_df = date_df.withColumn('arrival_week', F.weekofyear('arrdate'))
date_df = date_df.withColumn('arrival_month', F.month('arrdate'))
date_df = date_df.withColumn('arrival_year', F.year('arrdate'))
date_df = date_df.withColumn('arrival_weekday', F.dayofweek('arrdate'))

# # create an id field in calendar df
# date_df = date_df.withColumn('id', monotonically_increasing_id())

# write the date dimension to parquet file
date_df.write.parquet('./output_data/' + "arrival_dates", mode="overwrite")

In [184]:
# get filepath to dates data file
path = "output_data/arrival_dates"
    
# read data file
df_dates = spark.read\
          .parquet(path)

# .option("recursiveFileLookup","true")\ --> This was preventing loading columns partitioned by

print(df_dates.count())
df_dates.printSchema()

with pd.option_context('display.max_columns', None):
    display(df_dates.limit(5).toPandas())

30
root
 |-- arrdate: date (nullable = true)
 |-- arrival_day: integer (nullable = true)
 |-- arrival_week: integer (nullable = true)
 |-- arrival_month: integer (nullable = true)
 |-- arrival_year: integer (nullable = true)
 |-- arrival_weekday: integer (nullable = true)



Unnamed: 0,arrdate,arrival_day,arrival_week,arrival_month,arrival_year,arrival_weekday
0,2016-04-25,25,17,4,2016,2
1,2016-04-22,22,16,4,2016,6
2,2016-04-30,30,17,4,2016,7
3,2016-04-26,26,17,4,2016,3
4,2016-04-04,4,14,4,2016,2


<br>

In [185]:
# get filepath to demographics data file
path = "output_data/demographics"
    
# read data file
df_demographics = spark.read\
          .option("recursiveFileLookup","true")\
          .parquet(path)

print(df_demographics.count())
df_demographics.printSchema()

with pd.option_context('display.max_columns', None):
    display(df_demographics.limit(5).toPandas())

2891
root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- male_population: integer (nullable = true)
 |-- female_population: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- n_veterans: integer (nullable = true)
 |-- foreign_born: integer (nullable = true)
 |-- avg_household_size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)



Unnamed: 0,City,State,median_age,male_population,female_population,total_population,n_veterans,foreign_born,avg_household_size,state_code,Race,Count
0,Silver Spring,Maryland,33.8,40601,41862,82463,1562,30908,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129,49500,93629,4147,32935,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040,46799,84839,4819,8229,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127,87105,175232,5821,33878,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040,143873,281913,5829,86253,2.73,NJ,White,76402


<a id='check'></a>
#### 4.2 Data Quality Checks
Build the data pipelines to create the data model.
<ul>
 <li><a href="#model">4.1 Create the data model</a></li>
 <li><a href="#check"><b>4.2 Data Quality Checks</b></a></li>
 </ul>
 
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [186]:
# Perform quality checks here

def quality_checks(df, table_name):
    """Count checks on fact and dimension table to ensure completeness of data.
    
    Args:
        df --> spark dataframe 
        table_name --> corresponding name of table
    """
    total_count = df.count()

    if total_count == 0:
        print(f"Data quality check failed for {table_name} with zero records!")
    else:
        print(f"Data quality check passed for {table_name} with {total_count:,} records.")
    return 0

In [187]:
quality_checks(df_immigration, 'fact_immigration')



Data quality check passed for fact_immigration with 3,096,313 records.




0

In [188]:
quality_checks(df_demographics, 'dim_demographics')

Data quality check passed for dim_demographics with 2,891 records.


0

In [189]:
quality_checks(df_dates, 'dim_dates')

Data quality check passed for dim_dates with 30 records.


0

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

<a id='step5'></a>
#### Step 5: Complete Project Write Up
<ul>
 <li><a href="#intro">Project Summary</a></li>
 <li><a href="#step1">Step 1: Scope the Project and Gather Data</a></li>
 <li><a href="#step2">Step 2: Explore and Assess the Data</a></li>
 <li><a href="#step3">Step 3: Define the Data Model</a></li>
 <li><a href="#step4">Step 4: Run ETL to Model the Data</a></li>
 <li><a href="#step5"><b>Step 5: Complete Project Write Up</b></a></li>
 </ul>

* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**Clearly state the rationale for the choice of tools and technologies for the project.**
- Consdiering the significant size of the immigration dataset (~ 3 million rows) for only a month, the most sensible technology choice for such an approach would be spark, especially if we were to process data over a longer period of time.

- Apache spark was used because of:
    - it's ability to handle multiple file formats with large amounts of data.
    - Apache Spark offers a lightning-fast unified analytics engine for big data.
    - Spark has easy-to-use APIs for operating on large datasets

**Propose how often the data should be updated and why.**
- The current I94 immigration data is updated monthly, and hence the data will be updated monthly.
- A monthly update would be sufficient for the needs of this study.


### Write a description of how you would approach the problem differently under the following scenarios:
**The data was increased by 100x**
- Spark can handle the increase but we would consider increasing the number of nodes in our cluster.
- We would still use spark as it as our data processing platform since it is the best suited platform for very large datasets.
- Our data would be stored in an Amazon S3 bucket (instead of storing it in the EMR cluster along with the staging tables) and loaded to our staging tables. 


**The data populates a dashboard that must be updated on a daily basis by 7am every day.**
- We would use Apache Airflow to schedule and run data pipelines.


**The database needed to be accessed by 100+ people:**
- We would move our analytics database into Amazon Redshift
- Once the data is ready to be consumed, it would be stored in a postgres database on a redshift cluster that easily supports multiuser access.

<br>

<br>