In [82]:
!ls ../data

29_2150_compressed_GlobalLandTemperaturesByCity.csv.zip
29_2150_compressed_GlobalLandTemperaturesByCountry.csv.zip
29_2150_compressed_GlobalLandTemperaturesByMajorCity.csv.zip
29_2150_compressed_GlobalLandTemperaturesByState.csv.zip
I94_SAS_Labels_Descriptions.SAS
airport-codes_csv.csv
capstone-user.csv
datasets_29_2150_GlobalTemperatures.csv
immigration_data_sample.csv
[34moutput[m[m
[34msas_data[m[m
sub-est2019_all.csv
us-cities-demographics.csv
us_states.csv


In [83]:
import configparser
import time
import os
import re

import pandas as pd
import numpy as np
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType as R, StructField as Fld
from pyspark.sql.types import IntegerType, StringType, DoubleType, DateType
from pyspark.sql.functions import udf, col, to_date
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, dayofweek, date_format, from_unixtime

In [84]:
config_s3 = configparser.ConfigParser()
config_s3.read_file(open('../aws_setup.cfg'))

S3_REGION              = config_s3.get('S3', 'REGION')
S3_BUCKET_NAME         = config_s3.get('S3', 'NAME')

In [85]:
output_data = '../data/output'

In [86]:
##i94 schema
i94Schema = R([
    Fld("_c0", StringType()),
    Fld("cicid", DoubleType()),
    Fld("i94yr", DoubleType()),
    Fld("i94mon", DoubleType()),
    Fld("i94cit", DoubleType()),
    Fld("i94res", DoubleType()),
    Fld("i94port", StringType()),
    Fld("arrdate", DoubleType()),
    Fld("i94mode", DoubleType()),
    Fld("i94addr", StringType()),
    Fld("depdate", DoubleType()), 
    Fld("i94bir", DoubleType()), 
    Fld("i94visa", DoubleType()),
    Fld("count", DoubleType()),
    Fld("dtadfile", StringType()),
    Fld("visapost", StringType()),
    Fld("occup", StringType()),
    Fld("entdepa", StringType()),
    Fld("entdepd", StringType()),
    Fld("entdepu", StringType()),
    Fld("matflag", StringType()),
    Fld("biryear", DoubleType()),
    Fld("dtaddto", StringType()),
    Fld("gender", StringType()),
    Fld("insnum", StringType()),
    Fld("airline", StringType()),
    Fld("admnum", StringType()),
    Fld("fltno", StringType()),
    Fld("visatype", StringType()),
])

In [87]:
# pd.read_csv(f's3a://{S3_BUCKET_NAME}/data/immigration_data_sample.csv')

In [88]:
## read
spark = SparkSession.builder \
        .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.7755") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.jars.packages", "net.java.dev.jets3t:jets3t:0.9.3") \
        .getOrCreate()


df_i94 = spark.read.csv('../data/immigration_data_sample.csv', header=True, schema=i94Schema)
# df_i94 = spark.read.csv(f's3a://{S3_BUCKET_NAME}/data/immigration_data_sample.csv', header=True, schema=i94Schema)

In [89]:
df_i94.select(col('insnum')).dropDuplicates().show(2)

+------+
|insnum|
+------+
|  3517|
|  3993|
+------+
only showing top 2 rows



In [90]:
## for time table
print("\nStarting to process time data.")
st = time.time()
get_timestamp = udf(lambda x: int(x*24*60*60)-315619200, IntegerType())
df_i94 = df_i94.withColumn('arrdate', to_date(from_unixtime(get_timestamp(df_i94.arrdate))))

time_table = df_i94.select(col("arrdate").alias("arrival_date"),
      dayofmonth("arrdate").alias("day"), 
      weekofyear("arrdate").alias("week"), 
      month("arrdate").alias("month"), 
      year("arrdate").alias("year"), 
      dayofweek("arrdate").alias("weekday")).dropDuplicates(['arrival_date'])
print(f" Finished processing time table. Used {(time.time() - st)/60:5.2} min.")


Starting to process time data.
 Finished processing time table. Used 0.0022 min.


In [94]:
time_table.filter(col('arrival_date').isNull()).count()

0

In [None]:
df_i94.show(2)

In [None]:
print(' Writing time data to S3.')
st = time.time()
# write time table to parquet files partitioned by year and month
output_path = os.path.join(output_data, 'time_table_run.parquet')
time_table.write.parquet(output_path, 'overwrite')
print(f"======== time data Done in  {(time.time() - st)/60:5.2}min.")

In [None]:
## for port table
# df_airport = pd.read_csv('../data/airport-codes_csv.csv')
df_states = pd.read_csv('../data/us_states.csv')

res = []
with open('../data/I94_SAS_Labels_Descriptions.SAS', 'r') as file_object:
    for line in file_object:
        if len(line.strip()) > 0:
            res.append(line.strip())

i94_desp = ''.join(res).split(';')

In [None]:
# process port description data
print("\nStarting to process port table data.")
st=time.time()
for desp in i94_desp:
    if 'I94PORT' in desp:
        break
        
desp = desp.replace("INT''L FALLS, MN", "INTL FALLS, MN")
desp = desp.replace("\t\t", "\t")
port_info = re.findall(re.compile("\'([A-Z0-9]+\'\\t=\\t\'.+)\'"), desp)[0]

port_info_list = port_info.split("''")
port_info_list = [x.split("\t=\t") for x in port_info_list]

df_port_info = pd.DataFrame(port_info_list, columns=['port_code', 'port_add'])

df_port_info['port_code'] = df_port_info['port_code'].str.replace("'", "")
df_port_info['port_add'] = df_port_info['port_add'].str.replace("'", "").str.strip()

# process the outlier
no_port_idx = df_port_info['port_add'].str.contains('No PORT Code')
df_port_info.loc[no_port_idx, 'port_add'] = 'NA, NA'

collapsed_idx = df_port_info['port_add'].str.contains('Collapsed')
df_port_info.loc[collapsed_idx, 'port_add'] = 'NA, NA'

unknown_idx = df_port_info['port_add'].str.contains('UNIDENTIFED')
df_port_info.loc[unknown_idx, 'port_add'] = 'NA, NA'

unknown_idx = df_port_info['port_add'].str.contains('UNKNOWN')
df_port_info.loc[unknown_idx, 'port_add'] = 'NA, NA'

df_port_info.loc[df_port_info['port_add'].str.contains('#INTL'), 'port_add'] = 'BELLINGHAM, WASHINGTON WA'
df_port_info.loc[df_port_info['port_add'].str.contains('PASO DEL NORTE,TX'), 'port_add'] = 'PASO DEL NORTE, TX'

states_list = df_states['Abbreviation'].unique()
def is_us(x):
    if len(np.intersect1d(np.array(x.split(' ')), states_list)) > 0:
        return True
    else:
        return False
    
def get_state(x):
    find_state = np.intersect1d(np.array(x.split(' ')), states_list)
    if len(find_state) > 0:
        return find_state[0]
    else:
        return 'NA'

def get_address(x):
    find_state = np.intersect1d(np.array(x.split(' ')), states_list)
    if len(find_state) > 0:
        state = find_state[0]
        x = x.replace(state, '').strip()
        x = x.replace(',', '')
        return x
    else:
        return x   

df_port_info = df_port_info[df_port_info['port_add'].apply(lambda x: is_us(x))]
df_port_info['state'] = df_port_info['port_add'].apply(lambda x: get_state(x))
df_port_info['address'] = df_port_info['port_add'].apply(lambda x: get_address(x))
df_port_info = df_port_info[['port_code', 'state', 'address']]
df_port_info = df_port_info.rename(columns={'port_code': 'port_key'})
print(f" Finished processing port table. Used {(time.time() - st)/60:5.2} min.")

In [None]:
## process country tables
for desp in i94_desp:
    if 'I94CIT & I94RES' in desp:
        break

region_list = re.findall(re.compile("([0-9]+) {1,2}=  {1,2}\'([A-Za-z0-9 ,\(\)\.:\/-]+)\'"), desp)
df_region = pd.DataFrame(region_list, columns=['region_key', 'region_name'])

df_region

In [None]:
## process i94 mode table
for desp in i94_desp:
    if 'I94MODE' in desp:
        break

i94mode_list= re.findall(re.compile("([1-9]) = \'([a-zA-Z ]+)\'"), desp)
df_i94mode = pd.DataFrame(i94mode_list, columns=['i94mode_key', 'i94mode'])
df_i94mode

In [None]:
## process state table
for desp in i94_desp:
    if 'I94ADDR' in desp:
        break

state_info_list = re.findall(re.compile("\'([A-Z9]{2})\'=\'([a-zA-Z \.]+)\'"), desp)
df_state = pd.DataFrame(state_info_list, columns=['state_code', 'state'])
df_state.head()

In [None]:
## process visa table
for desp in i94_desp:
    if 'I94VISA' in desp:
        break

visa_list = re.findall(re.compile("([1-3]) = ([A-Za-z]+)"), desp)
df_visa = pd.DataFrame(visa_list, columns=['visa_key', 'visa_broad_type'])
df_visa

In [None]:
df_i94.columns

In [69]:
## rename i94 table
col_rename = {'cicid': 'i94_key', 
              'i94res': 'res_region_key',
              'i94cit': 'cit_region_key', 
              'i94port': 'port_key',
              'arrdate': 'arrival_date', 
              'i94mode': 'i94mode_key',
              'i94addr': 'state_code', 
              'depdate': 'departure_date', 
              'i94bir': 'age', 
              'i94visa': 'visa_key', 
              'count': 'i94_count', 
              'dtadfile': 'i94_file_date', 
              'dtaddto': 'i94_leave_date', 
              'admnum': 'i94_admin_num'}
for old_col, new_col in col_rename.items():
    df_i94 = df_i94.withColumnRenamed(old_col, new_col)

In [70]:
## process i94 fact table 
select_cols = ['i94_key', 'res_region_key', 'cit_region_key', 'port_key', 
               'arrival_date', 'i94yr', 'i94mon', 'i94mode_key', 'state_code', 
               'departure_date', 'age', 'visa_key', 'i94_count', 'i94_file_date', 
               'occup', 'biryear', 'i94_leave_date', 'gender', 'insnum', 'airline',
               'i94_admin_num', 'fltno', 'visatype']

df_i94 = df_i94.select(select_cols)

# process data type
int_cols = ['i94_key', 'res_region_key', 'cit_region_key', \
            'i94yr', 'i94mon', 'i94mode_key', 'age', 'visa_key', 'i94_count', 'biryear']
for int_col in int_cols:
    df_i94 = df_i94.withColumn(int_col, col(int_col).cast(IntegerType()))
    
get_timestamp = udf(lambda x: None if x is None else int(x*24*60*60)-315619200, IntegerType())
tmp = df_i94.withColumn('departure_date', get_timestamp(df_i94.departure_date))

str_to_date = udf(lambda x: datetime.strptime(x, "%Y%m%d"), DateType())
df_i94 = df_i94.withColumn('i94_file_date', str_to_date(df_i94.i94_file_date))

str_to_date = udf(lambda x: None if x == 'D/S' else datetime.strptime(x, "%m%d%Y"), DateType())
df_i94 = df_i94.withColumn('i94_leave_date', str_to_date(df_i94.i94_leave_date))

remove_zero = udf(lambda x: x.replace('.0', ''))
df_i94 = df_i94.withColumn('i94_admin_num', remove_zero(df_i94.i94_admin_num))

In [74]:
datetime.now().year

2020

In [80]:
df_i94.agg({"i94yr":"max"}).collect()[0][0]

2016

In [71]:
df_i94.show()

+-------+--------------+--------------+--------+------------+-----+------+-----------+----------+--------------+---+--------+---------+-------------+-----+-------+--------------+------+------+-------+-------------+-----+--------+
|i94_key|res_region_key|cit_region_key|port_key|arrival_date|i94yr|i94mon|i94mode_key|state_code|departure_date|age|visa_key|i94_count|i94_file_date|occup|biryear|i94_leave_date|gender|insnum|airline|i94_admin_num|fltno|visatype|
+-------+--------------+--------------+--------+------------+-----+------+-----------+----------+--------------+---+--------+---------+-------------+-----+-------+--------------+------+------+-------+-------------+-----+--------+
|4084316|           209|           209|     HHW|     20566.0| 2016|     4|          1|        HI|       20573.0| 61|       2|        1|   2016-04-22| null|   1955|    2016-07-20|     F|  null|     JL|  56582674633|00782|      WT|
|4422636|           582|           582|     MCA|     20567.0| 2016|     4|      

In [None]:
## process demographics table
df_demo = pd.read_csv('../data/us-cities-demographics.csv', delimiter=';')
df_demo.columns = ['_'.join(x.split(' ')).lower() for x in df_demo.columns]