In [122]:
# Do all imports and installs here
from pyspark.sql.functions import udf
from pyspark.sql.session import SparkSession
import datetime
import pandas as pd
import pyspark.sql.functions as F
import psycopg2
from pyspark.sql.types import IntegerType


In [123]:
spark = SparkSession.builder\
                    .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                    .appName("Project: i94")\
                    .getOrCreate()

In [124]:
sp_sas_data = (spark.read.format("csv")\
                         .options(header="true")\
                         .load("D:/Capstone-Project/Project-Workspace/immigration_data_sample.csv")
              )

In [125]:
sp_sas_data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- cicid: string (nullable = true)
 |-- i94yr: string (nullable = true)
 |-- i94mon: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- count: string (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = tru

In [126]:
# Create a function that returns the desired string from a timestamp 
def convert_5_digit_sasdate_to_yyyymmdd(sasdate):
    epoch = datetime.datetime(1960, 1, 1)
    return (epoch + datetime.timedelta(days=sasdate)).strftime('%Y%m%d')

# Create the UDF
convert_5_digit_sasdate_to_yyyymmdd_udf = udf(lambda x: convert_5_digit_sasdate_to_yyyymmdd(x))

#print(convert_5_digit_sasdate_to_yyyymmdd(20574.0))


In [127]:
sp_df = sp_sas_data
sp_df = sp_df.drop("_c0")
# drop columns that CIC does not use
sp_df = sp_df.drop("dtadfile","visapost","occup","entdepa","entdepd","entdepu","dtaddto")
sp_df = sp_df.drop("i94yr","i94mon","insnum","admnum")
sp_df = sp_df.withColumn("arrdate", sp_df["arrdate"].cast(IntegerType()))
sp_df = sp_df.withColumn("depdate", sp_df["depdate"].cast(IntegerType()))
sp_df = sp_df.withColumnRenamed("arrdate", "arr_yyyymmdd")
sp_df = sp_df.withColumnRenamed("depdate", "dep_yyyymmdd")
sp_df = sp_df.withColumnRenamed("count", "i94count")



In [128]:
sp_df.printSchema()

root
 |-- cicid: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arr_yyyymmdd: integer (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- dep_yyyymmdd: integer (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- i94count: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [129]:
sp_depdate_nn = sp_df.where(F.col("dep_yyyymmdd").isNotNull())
sp_depdate_nn = sp_depdate_nn.withColumn('arr_yyyymmdd', convert_5_digit_sasdate_to_yyyymmdd_udf(sp_depdate_nn['arr_yyyymmdd']))
sp_depdate_nn = sp_depdate_nn.withColumn('dep_yyyymmdd', convert_5_digit_sasdate_to_yyyymmdd_udf(sp_depdate_nn['dep_yyyymmdd']))


In [130]:
sp_depdate_nn.createOrReplaceTempView("depdate_nn")
spark.sql("""
select *
from depdate_nn
where 1 = 1
""").show(5)

+---------+------+------+-------+------------+-------+-------+------------+------+-------+--------+-------+-------+------+-------+-----+--------+
|    cicid|i94cit|i94res|i94port|arr_yyyymmdd|i94mode|i94addr|dep_yyyymmdd|i94bir|i94visa|i94count|matflag|biryear|gender|airline|fltno|visatype|
+---------+------+------+-------+------------+-------+-------+------------+------+-------+--------+-------+-------+------+-------+-----+--------+
|4084316.0| 209.0| 209.0|    HHW|    20160422|    1.0|     HI|    20160429|  61.0|    2.0|     1.0|      M| 1955.0|     F|     JL|00782|      WT|
|4422636.0| 582.0| 582.0|    MCA|    20160423|    1.0|     TX|    20160424|  26.0|    2.0|     1.0|      M| 1990.0|     M|    *GA|XBLNG|      B2|
|1195600.0| 148.0| 112.0|    OGG|    20160407|    1.0|     FL|    20160427|  76.0|    2.0|     1.0|      M| 1940.0|     M|     LH|00464|      WT|
|5291768.0| 297.0| 297.0|    LOS|    20160428|    1.0|     CA|    20160507|  25.0|    2.0|     1.0|      M| 1991.0|     M|  

In [131]:
sp_depdate_n = sp_df.where(F.col("dep_yyyymmdd").isNull())
sp_depdate_n = sp_depdate_n.withColumn('arr_yyyymmdd', convert_5_digit_sasdate_to_yyyymmdd_udf(sp_depdate_n['arr_yyyymmdd']))


In [132]:
sp_depdate_n.createOrReplaceTempView("depdate_n")
spark.sql("""
select *
from depdate_n
where 1 = 1
""").show(5)

+---------+------+------+-------+------------+-------+-------+------------+------+-------+--------+-------+-------+------+-------+-----+--------+
|    cicid|i94cit|i94res|i94port|arr_yyyymmdd|i94mode|i94addr|dep_yyyymmdd|i94bir|i94visa|i94count|matflag|biryear|gender|airline|fltno|visatype|
+---------+------+------+-------+------------+-------+-------+------------+------+-------+--------+-------+-------+------+-------+-----+--------+
| 216657.0| 696.0| 696.0|    FTL|    20160401|    1.0|     FL|        null|  54.0|    2.0|     1.0|   null| 1962.0|     F|     2D|00406|      B2|
|5957654.0| 254.0| 276.0|    SAI|    20160412|    1.0|     GU|        null|  20.0|    2.0|     1.0|   null| 1996.0|     M|     7C|03404|     GMT|
|1435383.0| 574.0| 206.0|    BLA|    20160408|    3.0|     NE|        null|  61.0|    2.0|     1.0|   null| 1955.0|     F|   null|01788|      B2|
|1843262.0| 245.0| 245.0|    CHI|    20160410|    1.0|     IN|        null|  60.0|    2.0|     1.0|   null| 1956.0|     F|  

In [133]:
sp_stg_cic = sp_depdate_nn.union(sp_depdate_n)


In [134]:
sp_stg_cic.createOrReplaceTempView("stg_cic")
spark.sql("""
select count(*)
from stg_cic
where 1 = 1
and dep_yyyymmdd is null
""").show()

+--------+
|count(1)|
+--------+
|      49|
+--------+



In [135]:
sp_stg_cic.printSchema()

root
 |-- cicid: string (nullable = true)
 |-- i94cit: string (nullable = true)
 |-- i94res: string (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arr_yyyymmdd: string (nullable = true)
 |-- i94mode: string (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- dep_yyyymmdd: string (nullable = true)
 |-- i94bir: string (nullable = true)
 |-- i94visa: string (nullable = true)
 |-- i94count: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [136]:
from pyspark.sql.types import IntegerType
sp_stg_cic = sp_stg_cic.withColumn("cicid", sp_stg_cic["cicid"].cast(IntegerType()))
sp_stg_cic = sp_stg_cic.withColumn("i94cit", sp_stg_cic["i94cit"].cast(IntegerType()))
sp_stg_cic = sp_stg_cic.withColumn("i94res", sp_stg_cic["i94res"].cast(IntegerType()))
sp_stg_cic = sp_stg_cic.withColumn("i94mode", sp_stg_cic["i94mode"].cast(IntegerType()))
sp_stg_cic = sp_stg_cic.withColumn("i94bir", sp_stg_cic["i94bir"].cast(IntegerType()))
sp_stg_cic = sp_stg_cic.withColumn("i94visa", sp_stg_cic["i94visa"].cast(IntegerType()))
sp_stg_cic = sp_stg_cic.withColumn("i94count", sp_stg_cic["i94count"].cast(IntegerType()))
sp_stg_cic = sp_stg_cic.withColumn("biryear", sp_stg_cic["biryear"].cast(IntegerType()))
#sp_stg_cic = sp_stg_cic.withColumn("arr_yyyymmdd", sp_stg_cic["arr_yyyymmdd"].cast(IntegerType()))
#sp_stg_cic = sp_stg_cic.withColumn("dep_yyyymmdd", sp_stg_cic["dep_yyyymmdd"].cast(IntegerType()))


In [137]:
sp_stg_cic.show(5)

+-------+------+------+-------+------------+-------+-------+------------+------+-------+--------+-------+-------+------+-------+-----+--------+
|  cicid|i94cit|i94res|i94port|arr_yyyymmdd|i94mode|i94addr|dep_yyyymmdd|i94bir|i94visa|i94count|matflag|biryear|gender|airline|fltno|visatype|
+-------+------+------+-------+------------+-------+-------+------------+------+-------+--------+-------+-------+------+-------+-----+--------+
|4084316|   209|   209|    HHW|    20160422|      1|     HI|    20160429|    61|      2|       1|      M|   1955|     F|     JL|00782|      WT|
|4422636|   582|   582|    MCA|    20160423|      1|     TX|    20160424|    26|      2|       1|      M|   1990|     M|    *GA|XBLNG|      B2|
|1195600|   148|   112|    OGG|    20160407|      1|     FL|    20160427|    76|      2|       1|      M|   1940|     M|     LH|00464|      WT|
|5291768|   297|   297|    LOS|    20160428|      1|     CA|    20160507|    25|      2|       1|      M|   1991|     M|     QR|00739|  

In [138]:
conn = psycopg2.connect("host=127.0.0.1 dbname=i94 user=i94user password=Passw0rd")
cur = conn.cursor()

In [139]:
delete_from_stg_cic = ("""
delete from stg_cic;
""")


In [140]:
sp_stg_cic.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arr_yyyymmdd: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- dep_yyyymmdd: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- i94count: integer (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [141]:
insert_into_stg_cic = ("""
insert into stg_cic
(
    cicid,
    i94cit,
    i94res,
    i94port,
    arr_yyyymmdd,
    i94mode,
    i94addr,
    dep_yyyymmdd,
    i94bir,
    i94visa,
    i94count,
    matflag,
    biryear,
    gender,
    airline,
    fltno,
    visatype
) values 
(
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
);
""")

In [142]:
# a function to insert records
def insert_stg_cic_records(cur, stg_cic_df):
    for i, row in stg_cic_df.iterrows():
        #print('i =',i)
        print('row =',row)
        cur.execute(insert_into_stg_cic,row)


In [143]:
# delete records
cur.execute(delete_from_stg_cic)

In [144]:
sp_stg_cic.printSchema()

root
 |-- cicid: integer (nullable = true)
 |-- i94cit: integer (nullable = true)
 |-- i94res: integer (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arr_yyyymmdd: string (nullable = true)
 |-- i94mode: integer (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- dep_yyyymmdd: string (nullable = true)
 |-- i94bir: integer (nullable = true)
 |-- i94visa: integer (nullable = true)
 |-- i94count: integer (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- fltno: string (nullable = true)
 |-- visatype: string (nullable = true)



In [145]:
pd_stg_sic = sp_stg_cic.select("*").toPandas()

In [146]:
pd_stg_sic.head()

Unnamed: 0,cicid,i94cit,i94res,i94port,arr_yyyymmdd,i94mode,i94addr,dep_yyyymmdd,i94bir,i94visa,i94count,matflag,biryear,gender,airline,fltno,visatype
0,4084316,209,209,HHW,20160422,1,HI,20160429,61,2,1,M,1955,F,JL,00782,WT
1,4422636,582,582,MCA,20160423,1,TX,20160424,26,2,1,M,1990,M,*GA,XBLNG,B2
2,1195600,148,112,OGG,20160407,1,FL,20160427,76,2,1,M,1940,M,LH,00464,WT
3,5291768,297,297,LOS,20160428,1,CA,20160507,25,2,1,M,1991,M,QR,00739,B2
4,985523,111,111,CHM,20160406,3,NY,20160409,19,2,1,M,1997,F,,LAND,WT


In [147]:
pd_stg_sic[pd_stg_sic['dep_yyyymmdd'].isnull()]

Unnamed: 0,cicid,i94cit,i94res,i94port,arr_yyyymmdd,i94mode,i94addr,dep_yyyymmdd,i94bir,i94visa,i94count,matflag,biryear,gender,airline,fltno,visatype
951,216657,696,696,FTL,20160401,1,FL,,54,2,1,,1962,F,2D,00406,B2
952,5957654,254,276,SAI,20160412,1,GU,,20,2,1,,1996,M,7C,03404,GMT
953,1435383,574,206,BLA,20160408,3,NE,,61,2,1,,1955,F,,01788,B2
954,1843262,245,245,CHI,20160410,1,IN,,60,2,1,,1956,F,AA,00186,B2
955,4231176,586,586,NYC,20160422,1,NJ,,68,2,1,,1948,,AA,02179,B2
956,4732257,245,245,SFR,20160425,1,CA,,68,2,1,,1948,M,UA,00008,B2
957,1093165,575,575,MIA,20160406,1,FL,,34,2,1,,1982,M,AV,00310,B2
958,5913269,438,438,HHW,20160423,2,,,61,2,1,,1955,F,,,WT
959,510204,258,258,CHI,20160403,1,IL,,59,2,1,,1957,M,TK,5,B2
960,4700630,135,135,NYC,20160425,1,NY,,29,2,1,,1987,F,BA,117,WT


In [148]:
# insert records
insert_stg_cic_records(cur, pd_stg_sic)

row = cicid            4084316
i94cit               209
i94res               209
i94port              HHW
arr_yyyymmdd    20160422
i94mode                1
i94addr               HI
dep_yyyymmdd    20160429
i94bir                61
i94visa                2
i94count               1
matflag                M
biryear             1955
gender                 F
airline               JL
fltno              00782
visatype              WT
Name: 0, dtype: object
row = cicid            4422636
i94cit               582
i94res               582
i94port              MCA
arr_yyyymmdd    20160423
i94mode                1
i94addr               TX
dep_yyyymmdd    20160424
i94bir                26
i94visa                2
i94count               1
matflag                M
biryear             1990
gender                 M
airline              *GA
fltno              XBLNG
visatype              B2
Name: 1, dtype: object
row = cicid            1195600
i94cit               148
i94res               112
i94port    

row = cicid             688605
i94cit               213
i94res               213
i94port              EPI
arr_yyyymmdd    20160404
i94mode                3
i94addr               WA
dep_yyyymmdd    20160405
i94bir                40
i94visa                1
i94count               1
matflag                M
biryear             1976
gender                 M
airline             None
fltno               LAND
visatype              B1
Name: 170, dtype: object
row = cicid            1379698
i94cit               135
i94res               135
i94port              ORL
arr_yyyymmdd    20160408
i94mode                1
i94addr               FL
dep_yyyymmdd    20160422
i94bir                52
i94visa                2
i94count               1
matflag                M
biryear             1964
gender                 M
airline               VS
fltno              00071
visatype              WT
Name: 171, dtype: object
row = cicid            1396100
i94cit               209
i94res               209
i94port

Name: 338, dtype: object
row = cicid            4494327
i94cit               124
i94res               124
i94port              NYC
arr_yyyymmdd    20160424
i94mode                1
i94addr               NY
dep_yyyymmdd    20160501
i94bir                38
i94visa                2
i94count               1
matflag                M
biryear             1978
gender                 F
airline               DY
fltno              07001
visatype              WT
Name: 339, dtype: object
row = cicid            1143832
i94cit               689
i94res               689
i94port              MIA
arr_yyyymmdd    20160406
i94mode                1
i94addr               MA
dep_yyyymmdd    20160416
i94bir                33
i94visa                2
i94count               1
matflag                M
biryear             1983
gender                 F
airline               AA
fltno              00274
visatype              B2
Name: 340, dtype: object
row = cicid            4267184
i94cit               117
i94res 

row = cicid            3599914
i94cit               582
i94res               582
i94port              WAS
arr_yyyymmdd    20160419
i94mode                1
i94addr               MI
dep_yyyymmdd    20160423
i94bir                30
i94visa                1
i94count               1
matflag                M
biryear             1986
gender                 M
airline               UA
fltno              01567
visatype              B1
Name: 513, dtype: object
row = cicid            1434452
i94cit               516
i94res               516
i94port              MIA
arr_yyyymmdd    20160408
i94mode                1
i94addr               NC
dep_yyyymmdd    20160422
i94bir                71
i94visa                2
i94count               1
matflag                M
biryear             1945
gender                 M
airline               AA
fltno              02282
visatype              B2
Name: 514, dtype: object
row = cicid             494733
i94cit               213
i94res               213
i94port

Name: 686, dtype: object
row = cicid            2142356
i94cit               689
i94res               689
i94port              PHI
arr_yyyymmdd    20160411
i94mode                1
i94addr               FL
dep_yyyymmdd    20160417
i94bir                46
i94visa                2
i94count               1
matflag                M
biryear             1970
gender                 M
airline               JJ
fltno              08092
visatype              B2
Name: 687, dtype: object
row = cicid            3859815
i94cit               135
i94res               135
i94port              NYC
arr_yyyymmdd    20160421
i94mode                1
i94addr               NY
dep_yyyymmdd    20160424
i94bir                36
i94visa                2
i94count               1
matflag                M
biryear             1980
gender                 M
airline               AA
fltno              00279
visatype              WT
Name: 688, dtype: object
row = cicid            2187824
i94cit               209
i94res 

row = cicid            5735642
i94cit               213
i94res               213
i94port              CHI
arr_yyyymmdd    20160430
i94mode                1
i94addr               GA
dep_yyyymmdd    20160608
i94bir                64
i94visa                2
i94count               1
matflag                M
biryear             1952
gender                 M
airline               QR
fltno              00725
visatype              B2
Name: 862, dtype: object
row = cicid            5673587
i94cit               116
i94res               116
i94port              DUB
arr_yyyymmdd    20160430
i94mode                1
i94addr               NY
dep_yyyymmdd    20160503
i94bir                28
i94visa                2
i94count               1
matflag                M
biryear             1988
gender                 F
airline               EI
fltno              00105
visatype              WT
Name: 863, dtype: object
row = cicid            4882428
i94cit               148
i94res               112
i94port

In [149]:
conn.commit()
conn.close()