In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import functions as f

In [20]:
# Reading and cleaning dim_arrival_mode
dim_arrival_mode = spark.read.csv('I94MODE.csv', sep=',', header=True, inferSchema=True)
dim_arrival_mode.toPandas().head()

Unnamed: 0,ID,Mode
0,1,Air
1,2,Sea
2,3,Land
3,9,Not reported


In [23]:
# Making headers and string to lower to avoid misspelling
dim_arrival_mode = dim_arrival_mode.toDF('code', 'mode')
dim_arrival_mode = dim_arrival_mode.withColumn('mode', f.lower(f.col('mode')))
dim_arrival_mode.toPandas()

Unnamed: 0,code,mode
0,1,air
1,2,sea
2,3,land
3,9,not reported


In [28]:
# Writing csv - Using .toPandas().to_csv to avoid folder creation (files are very small)
# dim_arrival_mode.write.csv('dim_arrival_mode.csv', header=True)
dim_arrival_mode.toPandas().to_csv('dim_arrival_mode.csv', header=True, index=False) 

In [24]:
# Reading and cleaning dim_visa
dim_visa = spark.read.csv('I94VISA.csv', sep=',', header=True, inferSchema=True)
dim_visa.toPandas().head()

Unnamed: 0,ID,Type
0,1,Business
1,2,Pleasure
2,3,Student


In [25]:
# Making headers and string to lower to avoid misspelling
dim_visa = dim_visa.toDF('code', 'type')
dim_visa = dim_visa.withColumn('type', f.lower(f.col('type')))
dim_visa.toPandas()

Unnamed: 0,code,type
0,1,business
1,2,pleasure
2,3,student


In [29]:
# Writing csv
dim_visa.toPandas().to_csv('dim_visa.csv', header=True, index=False)

In [44]:
# Reading and cleaning dim_countries
dim_countries = spark.read.csv('I94CIT_I94RES.csv', sep=',', header=True, inferSchema=True)
dim_countries.toPandas().head()

Unnamed: 0,Code,I94CTRY
0,582,MEXICO
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA


In [50]:
# Making headers to lower to avoid misspelling
dim_countries = dim_countries.toDF('code', 'name')
dim_countries.columns

['code', 'name']

In [46]:
# Writing csv
dim_countries.toPandas().to_csv('dim_countries.csv', header=True, index=False)

In [61]:
# Reading and cleaning dim_us_states
dim_us_states = spark.read.csv('I94ADDR.csv', sep=',', header=True, inferSchema=True)
dim_us_states.toPandas().head()

Unnamed: 0,code,state
0,AL,ALABAMA
1,AK,ALASKA
2,AZ,ARIZONA
3,AR,ARKANSAS
4,CA,CALIFORNIA


In [63]:
# Making headers and string to lower to avoid misspelling
dim_us_states = states.toDF('code', 'name')
dim_us_states.columns

['code', 'name']

In [64]:
# Writing csv
dim_us_states.toPandas().to_csv('dim_us_states.csv', header=True, index=False)

In [59]:
# Gathering city codes and names
dim_orig_port = spark.read.csv('I94PORT.csv', sep=',', header=True, inferSchema=True)
dim_orig_port.toPandas().head()

Unnamed: 0,ID,Port
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"


In [60]:
# Separating sufix frorm name
dim_orig_port = dim_orig_port.withColumn('name', f.split(dim_orig_port.Port, ', ')[0])
dim_orig_port = dim_orig_port.withColumn('st_or_ctry', f.split(dim_orig_port.Port, ', ')[1])
dim_orig_port = dim_orig_port.drop('Port').toDF('port_code', 'name', 'st_or_ctry')

dim_orig_port.toPandas().head()

Unnamed: 0,port_code,name,st_or_ctry
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK


In [66]:
orig_port.filter(f.col('st_or_ctry').isNull()).count()

78

In [67]:
# Writing csv
dim_orig_port.toPandas().to_csv('dim_orig_port.csv', header=True, index=False)