# Constitute nonimmigrant SSOT based on temperature and demographics of U.S. project



### Data Engineering Capstone Project

#### Project Summary

In order to apply BI and SSOT database benefits to industries which related immigration, aviation and associated with the goverment.\
Final data model can give answers to following questions:
- Which countries residents visited U.S. the most in a specific month?
- Which port has most non immigrants arrival in year 2016?
- What is the change demographics in a given time period?
- What is the relation between temperature and foreign born population in U.S. cities?
- What is the most used airline in specefic time period or entire year of 2016?
- What is the highest visiting purpose cities which has population over specific limit?

The project following steps below:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import os
import logging

In [None]:
# Creating spark session
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()


In [None]:
#setting logging.info
logger = logging.getLogger()
logger.setLevel(logging.INFO)

### Step 1: Scope the Project and Gather Data

---

#### Scope 
In the project, immigration related a few datasets have been used for creating a datawarehouse. In this manner data evaulated in proper form into standards of data analysis and BI operations via ETL pipelines. Hence, ***Apache Spark*** big data tool , ***AWS Redshift*** cloud datawarehouse service and ***Apache Airflow*** workflow orchestration tool are used in the Project. 
#### Describe and Gather Data 
Data based on different sources. So that, data sources are listed below.
- i94 non immigrant data based on [i94_data](https://www.trade.gov/national-travel-and-tourism-office)
- World temperature data based on [temp_data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)
- U.S. demographics data based on [demog_data](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/)
- Airport code table is coming from [port](https://datahub.io/core/airport-codes#data)

On the other hand all data which are will be used in the project visualized in this section.

#### 1. Global Temperature Dataset Observation

In [None]:
# Read in the data here
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
size_temp = (os.path.getsize(fname))*10**(-6)
logging.info(f"size of global land temperature by city {size_temp} megabytes")
df_temp = pd.read_csv(fname)
df_temp = df_temp[df_temp["Country"]== "United States"]    #df_temp_usa = df_temp[df_temp['Country'] == 'United States']

In [None]:
# Temperature at specific US city
df_temp_us_city = df_temp[df_temp["City"] == "New York"]
df_temp_us_city.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 3239 entries, 5203973 to 5207211
Data columns (total 7 columns):
dt                               3239 non-null object
AverageTemperature               3119 non-null float64
AverageTemperatureUncertainty    3119 non-null float64
City                             3239 non-null object
Country                          3239 non-null object
Latitude                         3239 non-null object
Longitude                        3239 non-null object
dtypes: float64(2), object(5)
memory usage: 202.4+ KB


In [None]:
df_temp.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
47555,1820-01-01,2.101,3.217,Abilene,United States,32.95N,100.53W
47556,1820-02-01,6.926,2.853,Abilene,United States,32.95N,100.53W
47557,1820-03-01,10.767,2.395,Abilene,United States,32.95N,100.53W
47558,1820-04-01,17.989,2.202,Abilene,United States,32.95N,100.53W
47559,1820-05-01,21.809,2.036,Abilene,United States,32.95N,100.53W


In [None]:
df_temp.tail()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
8439242,2013-05-01,15.544,0.281,Yonkers,United States,40.99N,74.56W
8439243,2013-06-01,20.892,0.273,Yonkers,United States,40.99N,74.56W
8439244,2013-07-01,24.722,0.279,Yonkers,United States,40.99N,74.56W
8439245,2013-08-01,21.001,0.323,Yonkers,United States,40.99N,74.56W
8439246,2013-09-01,17.408,1.048,Yonkers,United States,40.99N,74.56W


In [None]:
df_temp.info()
f"{len(df_temp):,}"

<class 'pandas.core.frame.DataFrame'>
Int64Index: 687289 entries, 47555 to 8439246
Data columns (total 7 columns):
dt                               687289 non-null object
AverageTemperature               661524 non-null float64
AverageTemperatureUncertainty    661524 non-null float64
City                             687289 non-null object
Country                          687289 non-null object
Latitude                         687289 non-null object
Longitude                        687289 non-null object
dtypes: float64(2), object(5)
memory usage: 41.9+ MB


'687,289'

In [None]:
df_temp["Country"].drop_duplicates().head()

47555    United States
Name: Country, dtype: object

In [None]:
#Creating a new data sample in order to implement test results more quickly.
df = pd.read_csv("../../data2/GlobalLandTemperaturesByCity.csv")
dfshort=df.iloc[0:1599211]
dfshort.to_csv("sample_citytemp_csv")

#### 2. Immigration Dataset Observation


#####  List of I94 immigration data in disc 

In [None]:
root_dir='../../data/18-83510-I94-Data-2016'
sub_files=os.listdir(root_dir)
sub_files

['i94_apr16_sub.sas7bdat',
 'i94_sep16_sub.sas7bdat',
 'i94_nov16_sub.sas7bdat',
 'i94_mar16_sub.sas7bdat',
 'i94_jun16_sub.sas7bdat',
 'i94_aug16_sub.sas7bdat',
 'i94_may16_sub.sas7bdat',
 'i94_jan16_sub.sas7bdat',
 'i94_oct16_sub.sas7bdat',
 'i94_jul16_sub.sas7bdat',
 'i94_feb16_sub.sas7bdat',
 'i94_dec16_sub.sas7bdat']

##### Size of the data

In [None]:
file_sizes=[]
for file in sub_files:
    fname = os.path.join(root_dir,file)
    size=(os.path.getsize(fname))*10**(-6)
    file_size =[fname,size]
    file_sizes.append(file_size)
file_sizes_df = pd.DataFrame(file_sizes,columns= ["file","Size_(Megabyte)"])
file_sizes_df.loc['Total Size'] = pd.Series(file_sizes_df['Size_(Megabyte)'].sum(), index = ['Size_(Megabyte)'])
file_sizes_df = file_sizes_df.replace(np.nan,"")
logging.info(f"Size of the batch data calculated as:\n {file_sizes_df}")


INFO:root:Size of the batch data calculated as:
                                                          file  Size_(Megabyte)
0           ../../data/18-83510-I94-Data-2016/i94_apr16_su...       471.990272
1           ../../data/18-83510-I94-Data-2016/i94_sep16_su...       569.180160
2           ../../data/18-83510-I94-Data-2016/i94_nov16_su...       444.334080
3           ../../data/18-83510-I94-Data-2016/i94_mar16_su...       481.296384
4           ../../data/18-83510-I94-Data-2016/i94_jun16_su...       716.570624
5           ../../data/18-83510-I94-Data-2016/i94_aug16_su...       625.541120
6           ../../data/18-83510-I94-Data-2016/i94_may16_su...       525.008896
7           ../../data/18-83510-I94-Data-2016/i94_jan16_su...       434.176000
8           ../../data/18-83510-I94-Data-2016/i94_oct16_su...       556.269568
9           ../../data/18-83510-I94-Data-2016/i94_jul16_su...       650.117120
10          ../../data/18-83510-I94-Data-2016/i94_feb16_su...       391.905280
11 

##### April month's i94 immigration data file investigation

In [None]:
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
#fname = os.path.join(root_dir,sub_files[0])
immigration_file_size = (os.path.getsize(fname))*10**(-6)
logging.info(f"Immigration batch data file size {immigration_file_size} megabytes.")
df_im = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [None]:
#data cluster example where occupation is not null value
df_im_occupation = df_im[df_im["occup"].notnull()]
df_im_occupation.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
1818,2092.0,2016.0,4.0,105.0,105.0,NYC,20545.0,1.0,NY,20622.0,30.0,1.0,1.0,20160401,FRN,ELT,G,N,,M,1986.0,3312018,F,,AB,92467970000.0,7450,E2
2350,2715.0,2016.0,4.0,107.0,107.0,CHI,20545.0,1.0,IL,20569.0,35.0,1.0,1.0,20160401,WRW,PHS,G,O,,M,1981.0,9302016,M,,UA,92489070000.0,953,B1
3169,3639.0,2016.0,4.0,108.0,108.0,NYC,20545.0,1.0,NY,,23.0,1.0,1.0,20160401,CPN,EXA,G,,,,1993.0,3312018,F,,DY,92513670000.0,7011,E2
8927,10527.0,2016.0,4.0,111.0,111.0,NYC,20545.0,1.0,NY,20550.0,34.0,1.0,1.0,20160401,PRS,EXA,G,O,,M,1982.0,3312018,M,,AF,92514370000.0,8,E1
8928,10528.0,2016.0,4.0,111.0,111.0,NYC,20545.0,1.0,NY,20554.0,42.0,1.0,1.0,20160401,PRS,EXA,G,O,,M,1974.0,3312018,M,,AF,92513660000.0,8,E1


In [None]:
pd.set_option('display.max_columns', None)

In [None]:
df_im.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [None]:
df_im.tail()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
3096308,625229.0,2016.0,4.0,745.0,745.0,SYS,20547.0,3.0,CA,,36.0,2.0,1.0,20160403,,,Z,,,,1980.0,5082016,,,,78934560000.0,00066,B2
3096309,1972204.0,2016.0,4.0,745.0,745.0,SYS,20554.0,3.0,CA,20555.0,36.0,2.0,1.0,20160410,BLG,,Z,Q,,M,1980.0,9102016,F,,,90300540000.0,00066,B2
3096310,4249448.0,2016.0,4.0,745.0,745.0,TEC,20566.0,3.0,VA,20588.0,23.0,2.0,1.0,20160422,BLG,,Z,O,,M,1993.0,9202016,F,,,91416720000.0,00651,B2
3096311,5658953.0,2016.0,4.0,748.0,748.0,NEW,20573.0,3.0,MN,,57.0,2.0,1.0,20160429,CLG,,Z,,,,1959.0,10282016,M,,,94887100000.0,LAND,B2
3096312,3106671.0,2016.0,4.0,123.0,749.0,NOG,20561.0,3.0,AZ,20567.0,58.0,1.0,1.0,20160417,,,Z,O,,M,1958.0,7102016,M,,,56056870000.0,00866,WB


In [None]:
pd.reset_option('max_columns')

In [None]:
df_im.info()
f"{len(df_im):,}"

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3096313 entries, 0 to 3096312
Data columns (total 28 columns):
cicid       float64
i94yr       float64
i94mon      float64
i94cit      float64
i94res      float64
i94port     object
arrdate     float64
i94mode     float64
i94addr     object
depdate     float64
i94bir      float64
i94visa     float64
count       float64
dtadfile    object
visapost    object
occup       object
entdepa     object
entdepd     object
entdepu     object
matflag     object
biryear     float64
dtaddto     object
gender      object
insnum      object
airline     object
admnum      float64
fltno       object
visatype    object
dtypes: float64(13), object(15)
memory usage: 661.4+ MB


'3,096,313'

#### 3. Demographics of USA Observation 

In [None]:
pname="us-cities-demographics.csv"
df_dem=pd.read_csv(pname,";")
df_dem.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [None]:
df_dem.tail()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
2886,Stockton,California,32.5,150976.0,154674.0,305650,12822.0,79583.0,3.16,CA,American Indian and Alaska Native,19834
2887,Southfield,Michigan,41.6,31369.0,41808.0,73177,4035.0,4011.0,2.27,MI,American Indian and Alaska Native,983
2888,Indianapolis,Indiana,34.1,410615.0,437808.0,848423,42186.0,72456.0,2.53,IN,White,553665
2889,Somerville,Massachusetts,31.0,41028.0,39306.0,80334,2103.0,22292.0,2.43,MA,American Indian and Alaska Native,374
2890,Coral Springs,Florida,37.2,63316.0,66186.0,129502,4724.0,38552.0,3.17,FL,White,90896


In [None]:
df_dem.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
City                      2891 non-null object
State                     2891 non-null object
Median Age                2891 non-null float64
Male Population           2888 non-null float64
Female Population         2888 non-null float64
Total Population          2891 non-null int64
Number of Veterans        2878 non-null float64
Foreign-born              2878 non-null float64
Average Household Size    2875 non-null float64
State Code                2891 non-null object
Race                      2891 non-null object
Count                     2891 non-null int64
dtypes: float64(6), int64(2), object(4)
memory usage: 271.1+ KB


##### 4. Airport-codes csv file observation

In [None]:
file_loc='airport-codes_csv.csv'
df_aircode=pd.read_csv(file_loc,",")
df_aircode.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
ident           55075 non-null object
type            55075 non-null object
name            55075 non-null object
elevation_ft    48069 non-null float64
continent       27356 non-null object
iso_country     54828 non-null object
iso_region      55075 non-null object
municipality    49399 non-null object
gps_code        41030 non-null object
iata_code       9189 non-null object
local_code      28686 non-null object
coordinates     55075 non-null object
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [None]:
df_aircode.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [None]:
df_aircode.tail()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
55070,ZYYK,medium_airport,Yingkou Lanqi Airport,0.0,AS,CN,CN-21,Yingkou,ZYYK,YKH,,"122.3586, 40.542524"
55071,ZYYY,medium_airport,Shenyang Dongta Airport,,AS,CN,CN-21,Shenyang,ZYYY,,,"123.49600219726562, 41.784400939941406"
55072,ZZ-0001,heliport,Sealand Helipad,40.0,EU,GB,GB-ENG,Sealand,,,,"1.4825, 51.894444"
55073,ZZ-0002,small_airport,Glorioso Islands Airstrip,11.0,AF,TF,TF-U-A,Grande Glorieuse,,,,"47.296388888900005, -11.584277777799999"
55074,ZZZZ,small_airport,Satsuma IÅjima Airport,338.0,AS,JP,JP-46,Mishima-Mura,RJX7,,,"130.270556, 30.784722"


In [None]:
df_aircode_city=df_aircode[df_aircode['municipality']=='New York']
df_aircode_city.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
7431,6N5,heliport,East 34th Street Heliport,10.0,,US,US-NY,New York,6N5,TSS,6N5,"-73.97209930419922, 40.74259948730469"
7432,6N6,seaplane_base,Evers Seaplane Base,,,US,US-NY,New York,6N6,,6N6,"-73.81620025634766, 40.84590148925781"
7433,6N7,seaplane_base,New York Skyports Inc Seaplane Base,,,US,US-NY,New York,,QNY,6N7,"-73.9729, 40.734001"
25070,JPB,closed,Pan Am Building Heliport,870.0,,US,US-NY,New York,,JPB,,"-73.9765, 40.7533"
25072,JRA,heliport,West 30th St. Heliport,7.0,,US,US-NY,New York,KJRA,JRA,JRA,"-74.007103, 40.754501"


In [None]:
df_aircode.query("iso_country=='US' & iata_code=='ALB' ")

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
26068,KALB,medium_airport,Albany International Airport,285.0,,US,US-NY,Albany,KALB,ALB,ALB,"-73.80169677734375, 42.74829864501953"


In [None]:
df_aircode.query("iso_country=='US' & iata_code=='FLX'")

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
27195,KFLX,small_airport,Fallon Municipal Airport,3963.0,,US,US-NV,Fallon,KFLX,FLX,FLX,"-118.749000549, 39.4990997314"


#### 4. Immigration Dataframe observation (via PySpark)


In [None]:
#writing to parquet .sas7batfile
root_dir='../../data/18-83510-I94-Data-2016'
sub_files=os.listdir(root_dir)
for file in sub_files:
    df_spark = spark.read.format('com.github.saurfang.sas.spark').load(f'../../data/18-83510-I94-Data-2016/{file}')
    prefix=file[4:9]
    df_spark.write.mode("overwrite").parquet(f"sas_data/{prefix}")

In [None]:
df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [None]:
# read data for observation
df_spark.write.mode("overwrite").parquet("sas_data")
df_spark=spark.read.parquet("sas_data/apr16")

In [None]:
pd.set_option('display.max_columns', None)
df_spark.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,37.0,2.0,1.0,,,,T,,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,25.0,3.0,1.0,20130811.0,SEO,,G,,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,55.0,2.0,1.0,20160401.0,,,T,O,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,28.0,2.0,1.0,20160401.0,,,O,O,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,4.0,2.0,1.0,20160401.0,,,O,O,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,57.0,1.0,1.0,20160401.0,,,O,O,,M,1959.0,09302016,,,AZ,92471040000.0,602.0,B1
6,19.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,63.0,2.0,1.0,20160401.0,,,O,K,,M,1953.0,09302016,,,AZ,92471400000.0,602.0,B2
7,20.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NJ,20558.0,57.0,2.0,1.0,20160401.0,,,O,K,,M,1959.0,09302016,,,AZ,92471610000.0,602.0,B2
8,21.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20553.0,46.0,2.0,1.0,20160401.0,,,O,O,,M,1970.0,09302016,,,AZ,92470800000.0,602.0,B2
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,48.0,1.0,1.0,20160401.0,,,O,O,,M,1968.0,09302016,,,AZ,92478490000.0,608.0,B1


In [None]:
#df_spark.dtypes()
df_spark.printSchema()


root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [None]:
f'{df_imm.count():,}'

'3,096,313'

### Step 2: Explore and Assess the Data

---

#### Explore the Data 
Data needs in order to process were identfied in this section.



##### Surveillance Labels Descriptions File (`I94_SAS_Labels_Descriptions.SAS`) 
    
- Data in the file observed in order to perform an description on the `i94 immigration data`,and info at description file implemented into i94 immigration data.
- On the other hand in the `data_prep.py` file, i94 non-immigrant data was corrected.


In [None]:
df_temp.isnull().sum()

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [None]:
df_temp_us_city.isnull().sum()

dt                                 0
AverageTemperature               120
AverageTemperatureUncertainty    120
City                               0
Country                            0
Latitude                           0
Longitude                          0
dtype: int64

In [None]:
df_im.isnull().sum()

cicid             0
i94yr             0
i94mon            0
i94cit            0
i94res            0
i94port           0
arrdate           0
i94mode         239
i94addr      152372
depdate      142457
i94bir          802
i94visa           0
count             0
dtadfile          1
visapost    1881250
occup       3088187
entdepa         238
entdepd      138429
entdepu     3095921
matflag      138429
biryear         802
dtaddto         477
gender       414269
insnum      2982605
airline       83627
admnum            0
fltno         19549
visatype          0
dtype: int64

In [None]:
df_dem.isnull().sum()

City                       0
State                      0
Median Age                 0
Male Population            3
Female Population          3
Total Population           0
Number of Veterans        13
Foreign-born              13
Average Household Size    16
State Code                 0
Race                       0
Count                      0
dtype: int64

#### Cleaning Steps
Document steps necessary to clean the data


- I94_SAS_Labels_Descriptions.SAS file organization :
`data_prep.py` file converts all descriptions into tabular format in order to further description needs.

- I94 Immigration data source organization
`data_prep.py` file converts sas7bat files into apache parquet format and combine them.

- City Temperature data source organization 
`data_prep.py` file converts csv file format into apache parquet format with respect to partition data about city and month. Thanks to that, relation between other data sources were exemined.

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Conceptual data model was selected as star schema. i94 non-immigrant data provided as personal information about non immigrants.\
On the other hand due to port city and arrival date temperature related information added on datawarehouse as dimensional table on the AWS Redshift.\
Conceptial data model is demonstrated below.\
\
![here](images/cmdkpeg.JPG)

#### 3.2 Mapping Out Data Pipelines
In the pipelines, first of all, template of the modelled database was created. \
From data lake, necessary data extracted and transformed into model table.\
In AWS Redshift, datawarehouse was kept for further processes.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
1- Clean and prepare the big data from data lake with script file `data_prep.py` process.In order to perform this operation in IAM role default EMR permissions should be checked.On the other hand, from AWS UI or AWS CLI EMR cluster should be created and script file performed as a spark job.\
2- As can be seen in `airflow/dag/capstone_spark_dag.py` pathway, a DAG was builded in order to perform the data\
between stated time intervals. Also, DAG allows to perform execute the pipeline everyday at 7 am.\
2- Relational database tables was created thanks to `airflow/dag/capstone_create_tables.sql` and *PostgresOperator*.\
3- In Redshift cluster, data staged into redshift via *StagetoRedshiftOperator*.\
4- Thanks to *LoadDimensionOperator*, dimensional tables which are showed in conceptual model inserted with help of `airflow/plugins/helpers/capstonesql_queries.py`.\
5- Help of *LoadFactOperator* `facti94` fact table was created.

Airflow DAG task steps can be seen below.

<img src="images/airflow_DAG.JPG" alt="airflow_dag" class="bg-primary" width="2000px" height="2000px">


#### 4.2 Data Quality Checks
Data quality checked *run_quality_checks* task with *DataQualityOperator*. Data skewness and malformation checked and qualified with this task step.\
On the other hand EMR spark job script was tested locally via sample data.

#### 4.3 Data dictionary 
Also data dictionary is presented below.
<img src="images/factdictionary.JPG" alt="dictionary" class="bg-primary" width="2000px" height="2000px">
<img src="images/personali94dict.JPG" alt="dictionary" classdictionary="bg-primary" width="2000px" height="2000px">
<img src="images/citytemperaturesdict.JPG" alt="dictionary" class="bg-primary" width="2000px" height="2000px">
<img src="images/portsdict.JPG" alt="dictionary" class="bg-primary" width="2000px" height="2000px">
<img src="images/demographicsdict.JPG" alt="dictionary" class="bg-primary" width="2000px" height="2000px">
<img src="images/datedict.JPG" alt="dictionary" class="bg-primary" width="2000px" height="2000px">


#### Step 5: Complete Project Write Up
* In order to handle big data,(in our case may not be considered as big data), spark lazy evaulation power and AWS EMR processing speed has been used in this project. Also, due to need of perform ETL process in cadance and backfilling needs performed by Airflow workflow orchestration tool.
* In order to show usability of the created relational database, following SQL query executed on redshift as an analytics approach. Thus, for an OLAP operation, information is obtained about which cities have more visitors in a given month (May in our case) and what is the relationship between the temperature of the city and the population change.

    *SELECT z.City,n.foreignborn_pop,COUNT(z.city) AS nonimmigrant_pop,n.avg_temp AS avg_temperature\
    FROM (SELECT p.city,i.arrival_month FROM personali94 i INNER JOIN ports p ON i.port_code=p.iata_code) AS z\
    LEFT JOIN (SELECT d.city,d.foreignborn_pop, t.avg_temp FROM demographics d INNER JOIN citytemperatures t ON d.city=t.city WHERE t.month = 5)  AS n\
    ON z.city = n.city\
    WHERE n.foreignborn_pop IS NOT NULL\
    GROUP BY z.city,n.foreignborn_pop,n.avg_temp\
    ORDER BY nonimmigrant_pop DESC;* 

* Also result of this query recorded as `result_sample` parquet file and illustrated next cell below. 
* ETL process should be perform monthly. Forwhy, although everyday nonimmigrant visitors visit into the U.S, the most dynamic data in the datasets which are the i94 nonimmigration dataset gets updates monthly basis.
* If data was increased by 100x, same approach which has been used in the project can be performed. However, EMR cluster specialities may can be changed toward a big scale cluster features. On the other hand, EMR job flow can be integrated into airflow DAG as an improvement to avaid unnecessary time lost while processing big amount of data.
* Airflow scheduling feature allows to perform ETL until 7am every morning.
* Redshift cluster can be accessed by over 10.000 users due to this [post](https://stackoverflow.com/questions/29764808/maximum-number-of-user-per-cluster-in-redshift). So, in this project accessibility is an ommissible.


In [None]:
spark.read.parquet("query_result").distinct().orderBy(F.desc("nonimmigrant_pop")).limit(10).toPandas()

Unnamed: 0,city,foreignborn_pop,nonimmigrant_pop,avg_temperature
0,MIAMI,260789,1946745,25.802071
1,LOS ANGELES,1485425,1016975,18.152643
2,NEW ORLEANS,21679,844075,25.003643
3,ORLANDO,50558,807850,25.584786
4,HOUSTON,696210,580600,25.068071
5,ATLANTA,32016,531405,19.413214
6,DALLAS,326825,443870,23.173643
7,BOSTON,190123,419630,13.905714
8,SEATTLE,119840,264340,9.989214
9,DETROIT,39861,218090,14.864143


  # NOTE
 At the beginnig, data was assumed as located into s3 as objects.\
    i94 data ==>[S3_Bucket_Source]/../../data/18-83510-I94-Data-{year}/*.sas7bat\
    temp data ==>[S3_Bucket_Source]/../../data2/GlobalLAndTemparaturesByCity.csv\
    port data ==> [S3_Bucket_Source]/airport-codes_csv.csv\
    demographics data  ==>[S3_Bucket_Source]/us-cities-demographics.csv\
    descriptions data ==>[S3_Bucket_Source]/94_SAS_Labels_Descriptions.SAS\
    configuration_file data ==>[S3_Bucket_Source]/config.cfg