# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import configparser
import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, udf, col, min, max, when
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import SQLContext
import psycopg2
import boto3

In [2]:
config = configparser.ConfigParser()
config.read('dwh.cfg')

['dwh.cfg']

In [3]:
os.environ['AWS_ACCESS_KEY_ID']=config['keypair']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['keypair']['AWS_SECRET_ACCESS_KEY']

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [4]:
# Read in the data here
fname = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df = pd.read_sas(fname, 'sas7bdat', encoding="ISO-8859-1")

In [5]:
df.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [6]:
df_filter = df[(df.visatype)=='B1']
df_filter.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
5,18.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MI,20555.0,...,,M,1959.0,9302016,,,AZ,92471040000.0,602,B1
9,22.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,NY,20562.0,...,,M,1968.0,9302016,,,AZ,92478490000.0,608,B1
12,27.0,2016.0,4.0,101.0,101.0,BOS,20545.0,1.0,MA,20549.0,...,,M,1958.0,4062016,M,,LH,92478760000.0,422,B1
13,28.0,2016.0,4.0,101.0,101.0,ATL,20545.0,1.0,MA,20549.0,...,,M,1960.0,4062016,F,,LH,92478900000.0,422,B1
24,40.0,2016.0,4.0,101.0,101.0,CHI,20545.0,1.0,IL,20554.0,...,,M,1981.0,9302016,M,,OS,92480560000.0,65,B1


In [7]:
	
from pyspark.sql import SparkSession
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.2")\
.enableHiveSupport().getOrCreate()

In [17]:
df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [9]:
#write to parquet
#df_spark.write.parquet("sas_data_parquet", "overwrite")
#df_spark.write.partitionBy("i94yr","i94mon").parquet("sas_data_parquet_partition", "overwrite")
df_spark=spark.read.parquet("sas_data_parquet_clean/")

In [10]:
#df_filter = df_spark[(df_spark.visatype)=='B1']
#df_filter.head()
#Check distinct visa types to check if there are any nulls as this would be used to draw metrics on immigrant type
df_spark.select('visatype').distinct().show()

+--------+
|visatype|
+--------+
|      F2|
|     GMB|
|      B2|
|      F1|
|     CPL|
|      I1|
|      WB|
|      M1|
|      B1|
|      WT|
|      M2|
|      CP|
|     GMT|
|      E1|
|       I|
|      E2|
|     SBP|
+--------+



In [5]:
# create timestamp column from original timestamp column
epoch = datetime.datetime(1960, 1, 1)
get_datetime = udf(lambda x: epoch + datetime.timedelta(days=int(x)))
df=df_spark.select(get_datetime(df_spark.arrdate).alias("Arrival_Date"), df_spark.arrdate)

df.head()
#From the above it seems like Mar 30, 2016 has code 20574.So removing 90 days from this would be 20484 which should be 
# jan 1, 2016
#test_date_id = 20484
test_date_id = 20454
print((epoch + datetime.timedelta(days=test_date_id)).year, (epoch + datetime.timedelta(days=test_date_id)).month,(epoch + datetime.timedelta(days=test_date_id)).day)
# Using 20454 as start date id, dim_date can be created from 2016-2020 with date_id+1 and counting down to 1900-1-1 as best practice

2016 1 1


In [58]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType
#from pyspark.sql.functions import to_date
import pyspark.sql.functions as f


epoch = datetime.datetime(1960, 1, 1)
start_date = datetime.datetime(2016,1,1)
#end_date = datetime.datetime(2020,12,31)
end_date = datetime.datetime(2016,1,10)
date_id = 20454
interval = datetime.timedelta(days =1)

#epoch = to_date("1960-01-01")
#start_date =to_date("2016-01-01")
#end_date = to_date("2016-12-31")
#num_days =1

schema = StructType([StructField("calendar_date", DateType(), False),
                     StructField("date_id", IntegerType(), False),
                     StructField("day", IntegerType(), False),
                     StructField("month", IntegerType(), False),
                     StructField("year", IntegerType(), False),
                     StructField("weekday", IntegerType(), False)]
                     )
#columns = ['calendar_date', 'date_id', 'day', 'month', 'year', 'weekday']
df = spark.createDataFrame([], schema)
#start_date = start_date.strftime("%m/%d/%Y %H:%M")
print(start_date)
while start_date <= end_date:
    #cur.execute(date_dim_insert, [start_date, date_id, start_date.day, start_date.month, start_date.year, start_date.weekday()])
    #conn.commit()
    start_date_ts = f.to_date(start_date.strftime("%m/%d/%Y"))
    print(start_date_ts)
    newRow = [start_date_ts, date_id, start_date.day, start_date.month, start_date.year, start_date.weekday()]
    temp_df = spark.createDataFrame(newRow, schema)
    df = df.union(newRow)
    date_id= date_id+1
    start_date = start_date+interval
#    start_date=date_add(start_date, num_days)

df.show()

2016-01-01 00:00:00
Column<b'to_date(`01/01/2016`)'>


ValueError: field calendar_date: This field is not nullable, but got None

In [14]:
#Use this section to load all the necessary files to S3 for further processing
s3 = boto3.resource('s3')
BUCKET = "capstoneimmi"

#Load demographic data to S3
s3.Bucket(BUCKET).upload_file("us-cities-demographics.csv", "demo/us-cities-demographics.csv")

#Load I94_Port data to S3
s3.Bucket(BUCKET).upload_file("I94_Port.csv", "port/I94_Port.csv")

#Load I94_Mode data to S3
s3.Bucket(BUCKET).upload_file("I94_Mode.csv", "mode/I94_Mode.csv")

#Load I94_Visa data to S3
s3.Bucket(BUCKET).upload_file("I94_Visa.csv", "visa/I94_Visa.csv")

#Load I94ADDR_State data to S3
s3.Bucket(BUCKET).upload_file("I94ADDR_State.csv", "addrstate/I94ADDR_State.csv")

#Load I94City_Res data to S3
s3.Bucket(BUCKET).upload_file("I94City_Res.csv", "rescitycntry/I94City_Res.csv")

In [24]:
#Load immi data to S3
#df_spark.write.parquet("s3a://capstoneimmi/sas-data-parquet/", mode="overwrite")

#from glob import glob
#filenames = glob('sas_data_parquet/part*.parquet')
#for f in filenames:
#    df=pd.append.parquet("sas_data_parquet")

#df_spark.write.parquet("sas_data_parquet", "overwrite")
df_spark.write.parquet("s3a://capstoneimmi/sas-data-parquet/", mode="overwrite")
#df_spark=spark.read.parquet("sas_data_parquet")

In [7]:
#May not use this
fname = 'us-cities-demographics.csv'
df = pd.read_csv(fname, delimiter=';')
df.head()

# Some of the data elements in this file is a snapshot in time as they are subject to change over time. For example, population, number of veterans, foreign born,etc. 
# So we will use distinct state_code, city, state to create dim_city_state. State_code, city with other elements could be used to create a snapshot fact if date/time element was in the data. 
# However, since the data doesn't have a date element, we can create a demographic_ref_dim.

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [8]:
fname = 'I94_Port.csv'
df = pd.read_csv(fname, delimiter=';')
df.head()
#remove single quotes from value and i94prtl so that it can be loaded into the db
df['value'] = df['value'].str.replace(r"[\',]", '')
#print(df)
df['i94prtl'] = df['i94prtl'].str.replace(r"[\']",'')

print(df)

                  value                      i94prtl
0                   ALC                     ALCAN,AK
1                   ANC                 ANCHORAGE,AK
2                   BAR      BAKERAAF-BAKERISLAND,AK
3                   DAC              DALTONSCACHE,AK
4                   PIZ        DEWSTATIONPTLAYDEW,AK
5                   DTH               DUTCHHARBOR,AK
6                   EGL                     EAGLE,AK
7                   FRB                 FAIRBANKS,AK
8                   HOM                     HOMER,AK
9                   HYD                     HYDER,AK
10                  JUN                    JUNEAU,AK
11                  5KE                 KETCHIKAN,AK
12                  KET                 KETCHIKAN,AK
13                  MOS    MOSESPOINTINTERMEDIATE,AK
14                  NIK                   NIKISKI,AK
15                  NOM                       NOM,AK
16                  PKC                POKERCREEK,AK
17                  ORI              PORTLIONS

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [11]:
# Performing cleaning tasks here
df_spark=spark.read.parquet("sas_data_parquet")
df_spark.createOrReplaceTempView("immi_fact")

#df_count = spark.sql("""select count(*) from immi_fact""")
#df_count.show()
#i94 year is used to analyze number of arrivals per year so checking if this column has any null values
#df_yr = spark.sql(""" select count(*) from immi_fact where i94yr is null""")
#df_yr.show()

#i94port is used to count arrivals per port per year so checking if there are any nulls here
#port_df = spark.sql("""select count(*) as null_port from immi_fact where i94port is null""")
#port_df.show()

#check if cicid is unique
#df_cicid = spark.sql("""select count(*) as dup_cicid_count from immi_fact group by cicid having count(*) > 2""")
#df_cicid.show()

#2982605 are null which is majority rows so not using this column in the fact 
#insnum_df = spark.sql("""select count(*) as null_insnum from immi_fact where insnum is null""")
#insnum_df.show()

#data model will be using i94addr to join to dim to find the number of arrivals so checking if this is null
#filter rows from immi_fact where i94addr is null
#i94addr_chk = spark.sql("""select count(*) as i94addr_chk from immi_fact where i94addr is null""")
#i94addr_chk.show()

#to analyze the type of immigrants arriving to a city, we can use i94visa column. check for distinct values in this row
#i94visa_chk = spark.sql(""" select distinct i94visa from immi_fact""")
#i94visa_chk.show()

#to analyze the type of immigrants arriving to a city, we can use i94visa column. check for nulls in this row
#i94visa_null = spark.sql(""" select count(*) as null_visa_chk from immi_fact where  i94visa is null""")
#i94visa_null.show()

#check valid dates
i94_invalid_date = spark.sql(""" select distinct dtaddto from immi_fact
                                    where length(TRIM(dtaddto)) < 8 or
                                     substring(DTADDTO, 1,2) NOT IN ('01', '02', '03',
                                      '04', '05','06', '07',
                                      '08', '09', '10', '11', '12')
                                      or 
                                      substring(DTADDTO, 3,2) NOT IN ('01', '02', '03',
                                      '04', '05','06', '07',
                                      '08', '09', '10', '11', '12',
                                      '11', '12', '13',
                                      '14', '15','16', '17',
                                      '18', '19', '20', '21', '22',
                                      '23','24', '25','26', '27',
                                      '28', '29', '30', '31')
                                      or                                      
                                     substring(DTADDTO, 5,2) NOT IN ('20')
                                    """)
i94_invalid_date.show(15)

i94_valid_date = spark.sql("""SELECT DISTINCT CICID, I94YR, I94MON, I94CIT,I94RES, I94PORT,ARRDATE,I94MODE, 
                            I94ADDR, DEPDATE, I94BIR, I94VISA, COUNT, 
                            DTADFILE,
                            VISAPOST, OCCUP, ENTDEPA, ENTDEPD, ENTDEPU, MATFLAG, BIRYEAR, 
                            CASE WHEN TRIM(dtaddto) IN ('183','10 02003','D/S','06 02002','/   183D','12319999') 
                            THEN NULL 
                            ELSE dtaddto END as DTADDTO, GENDER, INSNUM, AIRLINE, ADMNUM, FLTNO, VISATYPE
                            FROM immi_fact """)
#i94_valid_date = i94_invalid_date.withColumn("dtaddto", function(when((function(col(function(TRIM("dtaddto")))) =='D/S', 'null'))).otherwise(function(col("dtaddto"))))

i94_valid_date.show()

i94_valid_date.write.parquet("sas_data_parquet_clean", "overwrite")
#attempt to write clean data to s3
#i94_valid_date.write.parquet("s3a://capstoneimmi/sas-data-parquet/", "overwrite")

+--------+
| dtaddto|
+--------+
|     183|
|10 02003|
|     D/S|
|06 02002|
|/   183D|
|12319999|
+--------+

+------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
| CICID| I94YR|I94MON|I94CIT|I94RES|I94PORT|ARRDATE|I94MODE|I94ADDR|DEPDATE|I94BIR|I94VISA|COUNT|DTADFILE|VISAPOST|OCCUP|ENTDEPA|ENTDEPD|ENTDEPU|MATFLAG|BIRYEAR| DTADDTO|GENDER|INSNUM|AIRLINE|         ADMNUM|FLTNO|VISATYPE|
+------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
| 100.0|2016.0|   4.0| 103.0| 103.0|    DAL|20545.0|    1.0|     TX|20560.0|  31.0|    2.0|  1.0|20160401|    null| null|      O|      O|   null|      M| 1985.0|06292016|  null|  null|     BA|5.5447764233E10|00193|   

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

It needs the following dimensions to support the Fact
1. Date_Dim
2. City_State_Demographic_Dim
3. Visa_Type_Dim
4. Port_Dim
5. City_Res_Dim
6. Addr_State_Dim
7. Immigrant_Fact

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
1. Using the SAS integer dates provided, see date_dimension
2. After going over the I94_SAS_Labels_Description, I created csv files for Visa_Type, Port, City_Res based on the value and code provided. I will use these CSV files to create the corresponding dimensions tables and load the data.
3. Once the dimensions are created, create the fact table based on the metrics to be run and populate the table with data.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [12]:
conn = psycopg2.connect("host=127.0.0.1 dbname=studentdb user=student password=student")
conn.set_session(autocommit=True)
cur = conn.cursor()

# create immigration database with UTF8 encoding
cur.execute("DROP DATABASE IF EXISTS dw_immi")
cur.execute("CREATE DATABASE dw_immi WITH ENCODING 'utf8' TEMPLATE template0")

# close connection to default database
conn.close()

In [13]:
# connect to immigration database
conn = psycopg2.connect("host=127.0.0.1 dbname=dw_immi user=student password=student")
cur = conn.cursor()

In [11]:
# Create dimensions and load data into tables
#Date_Dim is one of the dimensions that will be used across the database for joining and analyzing data
#so seeding this dimension from beginning of time to a high end date helps with analysis

#As analyzed above, for arrival and departure dates, the source data uses sas integer date. So using the integer as
#date_id, I'm seeding data into the dimension table

Date_Dim_drop = "DROP TABLE IF EXISTS Date_Dim"

Date_Dim_create = ("""CREATE TABLE IF NOT EXISTS Date_Dim(calendar_date TIMESTAMP(6), date_id INTEGER, 
     day INTEGER, month INTEGER, year INTEGER, weekday INTEGER, PRIMARY KEY(calendar_date))
""")

cur.execute(Date_Dim_drop)
conn.commit()

cur.execute(Date_Dim_create)
conn.commit()

Date_Dim_insert = ("""INSERT INTO Date_Dim(calendar_date, date_id, day, month, year, weekday) VALUES( %s, %s, %s, %s, %s, %s) ON CONFLICT(calendar_date) DO NOTHING
""")
epoch = datetime.datetime(1960, 1, 1)
start_date = datetime.datetime(2016,1,1)
end_date = datetime.datetime(2020,12,31)
date_id = 20454
interval = datetime.timedelta(days =1)

while start_date <= end_date:
    cur.execute(Date_Dim_insert, [start_date, date_id, start_date.day, start_date.month, start_date.year, start_date.weekday()])
    conn.commit()
    date_id= date_id+1
    start_date = start_date+interval

start_date = datetime.datetime(2015,12,31)
end_date = datetime.datetime(1900,1, 1)
date_id = 20453
interval = datetime.timedelta(days = 1)

while start_date >= end_date:
    cur.execute(Date_Dim_insert, [start_date, date_id, start_date.day, start_date.month, start_date.year, start_date.weekday()])
    conn.commit()
    date_id= date_id-1
    start_date = start_date-interval

In [12]:
cur.execute("select * from Date_Dim where date_id < -2000 limit 10")
records = cur.fetchall()
for record in records:
    print(record)
    #print("start_date ", record[0])
    #print("date_id ", record[1])
    #print("day ", record[2])

(datetime.datetime(1954, 7, 10, 0, 0), -2001, 10, 7, 1954, 5)
(datetime.datetime(1954, 7, 9, 0, 0), -2002, 9, 7, 1954, 4)
(datetime.datetime(1954, 7, 8, 0, 0), -2003, 8, 7, 1954, 3)
(datetime.datetime(1954, 7, 7, 0, 0), -2004, 7, 7, 1954, 2)
(datetime.datetime(1954, 7, 6, 0, 0), -2005, 6, 7, 1954, 1)
(datetime.datetime(1954, 7, 5, 0, 0), -2006, 5, 7, 1954, 0)
(datetime.datetime(1954, 7, 4, 0, 0), -2007, 4, 7, 1954, 6)
(datetime.datetime(1954, 7, 3, 0, 0), -2008, 3, 7, 1954, 5)
(datetime.datetime(1954, 7, 2, 0, 0), -2009, 2, 7, 1954, 4)
(datetime.datetime(1954, 7, 1, 0, 0), -2010, 1, 7, 1954, 3)


In [15]:
#Use distinct state_code, city, state, total population, foreign born to create dim_demograph_city_state
fname = 'us-cities-demographics.csv'
df = pd.read_csv(fname, delimiter=';')
df.head()

demographic_dim_drop = "DROP TABLE IF EXISTS CITY_STATE_DEMOGRAPHIC_DIM"

demographic_dim_create = ("""CREATE TABLE IF NOT EXISTS CITY_STATE_DEMOGRAPHIC_DIM(
    CITY VARCHAR,
    STATE_CODE VARCHAR,
    STATE VARCHAR,
    MEDIAN_AGE NUMERIC(7,4),
    MALE_POP NUMERIC(20,4),
    FEMALE_POP NUMERIC(20,4),
    TOTAL_POP NUMERIC(20,4),
    NUM_VETERANS NUMERIC(20,4),
    FOREIGN_BORN NUMERIC(20,4),
    AVG_HOUSEHOLD_SIZE NUMERIC(7,3),
    RACE VARCHAR,
    COUNT NUMERIC(20,4),
    PRIMARY KEY(CITY, STATE_CODE)
)
""")

cur.execute(demographic_dim_drop)
conn.commit()

cur.execute(demographic_dim_create)
conn.commit()

demo_dim_insert = ("""INSERT INTO CITY_STATE_DEMOGRAPHIC_DIM(CITY, STATE_CODE,STATE,MEDIAN_AGE, MALE_POP,FEMALE_POP, TOTAL_POP,
                    NUM_VETERANS, FOREIGN_BORN, AVG_HOUSEHOLD_SIZE, RACE, COUNT
                    ) VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) 
                    ON CONFLICT(CITY, STATE_CODE) DO UPDATE
                        SET MEDIAN_AGE = EXCLUDED.MEDIAN_AGE,
                            MALE_POP = EXCLUDED.MALE_POP,
                            FEMALE_POP = EXCLUDED.FEMALE_POP,
                            TOTAL_POP = EXCLUDED.TOTAL_POP,
                            NUM_VETERANS = EXCLUDED.NUM_VETERANS,
                            FOREIGN_BORN = EXCLUDED.FOREIGN_BORN,
                            AVG_HOUSEHOLD_SIZE = EXCLUDED.AVG_HOUSEHOLD_SIZE,
                            RACE = EXCLUDED.RACE,
                            COUNT = EXCLUDED.COUNT
""")
 
demo_df = df[['City', 'State Code', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'Race', 'Count']]
for i, row in demo_df.iterrows():
    cur.execute(demo_dim_insert, row)

In [16]:
#cur.execute("select * from CITY_STATE_DIM limit 5")
#records = cur.fetchall()
#for record in records:
#    print("CITY ", record[0])
#    print("STATE ", record[1])
#    print("STATE_CODE ", record[2])
    
#check if duplicate states got inserted, West Virginia is not in the list of states
#cur.execute("select distinct state from CITY_STATE_DIM order by state")
#records = cur.fetchall()
#for record in records:
#    print("State", record[0])

cur.execute("select * from CITY_STATE_DEMOGRAPHIC_DIM limit 5")
records = cur.fetchall()
for record in records:
    #print(record)
    print("CITY ", record[0])
    print("STATE_CODE ", record[1])
    print("MEDIAN_AGE ", record[3])

CITY  Carolina
STATE_CODE  PR
MEDIAN_AGE  42.0000
CITY  North Little Rock
STATE_CODE  AR
MEDIAN_AGE  33.6000
CITY  Las Cruces
STATE_CODE  NM
MEDIAN_AGE  32.7000
CITY  San Bernardino
STATE_CODE  CA
MEDIAN_AGE  28.5000
CITY  Champaign
STATE_CODE  IL
MEDIAN_AGE  28.7000


In [22]:
#Visa_Type_Dim
fname = 'I94_Visa.csv'
df = pd.read_csv(fname)
df.head()

VISA_TYPE_DIM_drop = "DROP TABLE IF EXISTS VISA_TYPE_DIM"

VISA_TYPE_DIM_Create = ("""CREATE TABLE IF NOT EXISTS VISA_TYPE_DIM(
    VISA_CODE INTEGER,
    VISA_TYPE VARCHAR,
    PRIMARY KEY(VISA_TYPE)
)""")
cur.execute(VISA_TYPE_DIM_drop)
conn.commit()

cur.execute(VISA_TYPE_DIM_Create)
conn.commit()

visa_type_dim_insert = ("""INSERT INTO VISA_TYPE_DIM(VISA_CODE, VISA_TYPE) VALUES(%s, %s) ON CONFLICT(VISA_TYPE) 
                            DO NOTHING
""")

visa_df = df[['value', 'I94Visa']]
for i, row in visa_df.iterrows():
    cur.execute(visa_type_dim_insert, row)

In [23]:
cur.execute("select * from VISA_TYPE_DIM")
records = cur.fetchall()
for record in records:
    print(record)
    #print("CODE ", record[0])
    #print("NAME ", record[1])

(1, 'Business')
(2, 'Pleasure')
(3, 'Student')


In [24]:
#Port_Dim
fname = 'I94_Port.csv'
df = pd.read_csv(fname, delimiter=';')
df.head()

PORT_DIM_drop = "DROP TABLE IF EXISTS PORT_DIM"

PORT_DIM_Create = ("""CREATE TABLE IF NOT EXISTS PORT_DIM(
    PORT_CODE VARCHAR,
    PORT_NAME VARCHAR,
    PRIMARY KEY(PORT_CODE)
)""")
cur.execute(PORT_DIM_drop)
conn.commit()

cur.execute(PORT_DIM_Create)
conn.commit()

port_dim_insert = ("""INSERT INTO PORT_DIM(PORT_CODE, PORT_NAME) VALUES(%s, %s) ON CONFLICT(PORT_CODE) 
                            DO NOTHING
""")
df['value'] = df['value'].str.replace(r"[\',]", '')
#print(df)
df['i94prtl'] = df['i94prtl'].str.replace(r"[\']",'')
#df['i94prtl'] = df['i94prtl'].str.replace(r"[,]",' ')

df.head()
for i, row in df.iterrows():
    cur.execute(port_dim_insert, row)
cur.execute("select * from PORT_DIM limit 10")
records = cur.fetchall()
print("code, name")
for record in records:
    print(record)
    #print("CODE ", record[0])
    #print("NAME ", record[1])

code, name
('ALC', 'ALCAN,AK')
('ANC', 'ANCHORAGE,AK')
('BAR', 'BAKERAAF-BAKERISLAND,AK')
('DAC', 'DALTONSCACHE,AK')
('PIZ', 'DEWSTATIONPTLAYDEW,AK')
('DTH', 'DUTCHHARBOR,AK')
('EGL', 'EAGLE,AK')
('FRB', 'FAIRBANKS,AK')
('HOM', 'HOMER,AK')
('HYD', 'HYDER,AK')


In [None]:
#City_Res_Dim

In [25]:
#addr_state_dim
fname = 'i94ADDR_State.csv'
df = pd.read_csv(fname)
df.head()

addr_state_dim_drop = "DROP TABLE IF EXISTS addr_state_dim"

addr_state_dim_Create = ("""CREATE TABLE IF NOT EXISTS addr_state_dim(
	STATE_CODE VARCHAR,
	STATE VARCHAR,
	PRIMARY KEY(STATE_CODE)
)""")
cur.execute(addr_state_dim_drop)
conn.commit()

cur.execute(addr_state_dim_Create)
conn.commit()

addr_state_dim_insert = ("""INSERT INTO addr_state_dim(state_code, state) VALUES(%s, %s) ON CONFLICT(state_code) 
                            DO NOTHING
""")
for i, row in df.iterrows():
    cur.execute(addr_state_dim_insert, row)
cur.execute("select * from addr_state_dim limit 10")
records = cur.fetchall()
print("state_code, state")
for record in records:
    print(record)

state_code, state
("'AL'", "'ALABAMA'")
("'AK'", "'ALASKA'")
("'AZ'", "'ARIZONA'")
("'AR'", "'ARKANSAS'")
("'CA'", "'CALIFORNIA'")
("'CO'", "'COLORADO'")
("'CT'", "'CONNECTICUT'")
("'DE'", "'DELAWARE'")
("'DC'", "'DIST. OF COLUMBIA'")
("'FL'", "'FLORIDA'")


In [27]:
#Res_City_Country_dim
#I94City_Res.csv
fname = 'I94City_Res.csv'
df = pd.read_csv(fname, delimiter=";")

print(df.columns.tolist())

df['i94cntyl'] = df['i94cntyl'].str.replace(r"[\']", "")
df.head()

Res_City_Country_drop = "DROP TABLE IF EXISTS Res_City_Country_dim"
#
Res_City_Country_Create = ("""CREATE TABLE IF NOT EXISTS Res_City_Country_dim(
	RES_CODE VARCHAR,
	RES_STATE VARCHAR,
	PRIMARY KEY(RES_CODE)
)""")
cur.execute(Res_City_Country_drop)
conn.commit()

cur.execute(Res_City_Country_Create)
conn.commit()

Res_City_Country_dim_insert = ("""INSERT INTO Res_City_Country_dim(RES_CODE, RES_STATE) VALUES(%s, %s) ON CONFLICT(RES_CODE) 
                            DO NOTHING
""")

for i, row in df.iterrows():
    cur.execute(Res_City_Country_dim_insert, row)

cur.execute("select * from Res_City_Country_dim limit 10")
records = cur.fetchall()

print("res_code, res_state")
for record in records:
    print(record)

['value', 'i94cntyl']
res_code, res_state
('582', 'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)')
('236', 'AFGHANISTAN')
('101', 'ALBANIA')
('316', 'ALGERIA')
('102', 'ANDORRA')
('324', 'ANGOLA')
('529', 'ANGUILLA')
('518', 'ANTIGUA-BARBUDA')
('687', 'ARGENTINA ')
('151', 'ARMENIA')


In [17]:
#Fact table to load the SAS i94 file
df_spark=spark.read.parquet("sas_data_parquet")
df_count = spark.sql("""select * from immi_fact limit 1""")
df_count.show()

query =  "DROP TABLE IF EXISTS Immigrant_stage"

cur.execute(query)
conn.commit()

Immigrant_stage_Create = ("""CREATE TABLE IF NOT EXISTS Immigrant_stage(
    cicid BIGINT, i94yr INTEGER, i94mon INTEGER, i94cit INTEGER,
    i94res INTEGER,  i94port VARCHAR, arrdate INTEGER, i94mode INTEGER,
    i94addr VARCHAR, depdate INTEGER, i94bir INTEGER,  i94visa INTEGER,
    count INTEGER, dtadfile INTEGER, visapost VARCHAR, occup VARCHAR,
    entdepa VARCHAR, entdepd VARCHAR, entdepu VARCHAR, matflag VARCHAR,
    biryear INTEGER, dtaddto INTEGER, gender VARCHAR, insnum BIGINT,
    airline VARCHAR,  admnum BIGINT, fltno VARCHAR, visatype VARCHAR)""")

cur.execute(query)
conn.commit()

#df = df_count[['cicid', 'i94yr', 'i94mon','i94cit', 'i94res', 'i94port', 'arrdate','i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count', 'dtadfile', 'visapost', 'entdepa',
#'entdepd', 'entdepu', 'matflag', 'biryear', 'dtaddto', 'gender', 'airline', 'admnum', 'fltno', 'visatype']]


+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292016|     F|  null|     QF|9.495387003E10|00011|      B1|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+--

In [19]:
Immigrant_Fact_drop = "DROP TABLE IF EXISTS Immigrant_Fact"
#
Immigrant_Fact_Create = ("""CREATE TABLE IF NOT EXISTS Immigrant_Fact(
    CICID BIGINT, I94YR INTEGER, I94MON INTEGER, I94CIT INTEGER, I94RES INTEGER,
    I94PORT VARCHAR, ARRDATE INTEGER, I94MODE INTEGER, I94ADDR VARCHAR,
    DEPDATE INTEGER, I94BIR INTEGER, I94VISA INTEGER, COUNT INTEGER,
    DTADFILE BIGINT, VISAPOST VARCHAR, ENTDEPA VARCHAR, ENTDEPD VARCHAR,
    ENTDEPU VARCHAR, MATFLAG VARCHAR, BIRYEAR INTEGER, DTADDTO BIGINT,
    GENDER VARCHAR, AIRLINE VARCHAR, ADMNUM DECIMAL, FLTNO INTEGER,
    VISATYPE VARCHAR, PRIMARY KEY(CICID)
)""")
cur.execute(Immigrant_Fact_drop)
conn.commit()

cur.execute(Immigrant_Fact_Create)
conn.commit()

Immigrant_Fact_insert = ("""INSERT INTO Immigrant_Fact(CICID, I94YR, I94MON,I94CIT,I94RES, I94PORT, ARRDATE,
                            I94MODE, I94ADDR, DEPDATE, I94BIR, I94VISA, COUNT, DTADFILE, VISAPOST, ENTDEPA,
                            ENTDEPD, ENTDEPU, MATFLAG, BIRYEAR, DTADDTO, GENDER, AIRLINE, ADMNUM, FLTNO, VISATYPE 
                        ) VALUES(%s, %s,%s, %s,%s, %s,%s, %s,%s, %s,%s, %s,%s, %s,%s, %s,%s, %s,%s, %s, %s, %s,%s, %s,%s, %s) ON CONFLICT(CICID) 
                            DO NOTHING
""")


conn.close()

In [59]:
cur.execute("select * from Immigrant_Fact limit 10")
records = cur.fetchall()

for record in records:
    print(record)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
    * I chose Redshift and S3 for this project as it is good to be able to access source data and have a robust RDBMS for loading dimension and fact table so that analysis can be done
* Propose how often the data should be updated and why.
    * If this is for analytical purposes, fact data should be updated at least on a monthly basis
* Write a description of how you would approach the problem differently under the following scenarios:
     * The data was increased by 100x.
     * Make sure that S3 and Redshift capacity is scaled to match the volume of the data
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.  
     * Make sure the source data arrives to the S3 bucket at least 2 hours before the dashboard should be made available
     * Schedule the scripts (in Airflow or native scheduler) to load data to Redshift so that the data loads and quality checks are complete by 7 am
 * The database needed to be accessed by 100+ people.
     * The data is distributed and database can scale to accommodate the number of users.