In [68]:
import pandas as pd
import findspark
findspark.init()

In [69]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("EDA") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1") \
    .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
    .getOrCreate()
df_imm=spark.read.parquet("../data/sas_data")

In [70]:
df_imm.count()

2215673

In [5]:
df_states_mapping=spark.read.option("header","true").format("csv").csv("../data/State_mapping.csv",)

In [6]:
df_states_mapping.head(5)

[Row(code='AL', state='ALABAMA'),
 Row(code='AK', state='ALASKA'),
 Row(code='AZ', state='ARIZONA'),
 Row(code='AR', state='ARKANSAS'),
 Row(code='CA', state='CALIFORNIA')]

In [7]:
df_imm.createOrReplaceTempView("i94_data")
df_states_mapping.createOrReplaceTempView("states_mapping")

22/08/17 20:00:02 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [8]:
out_frame=spark.sql("""
SELECT * FROM
i94_data as id LEFT JOIN states_mapping as sm ON id.i94ADDR = sm.code
""")

In [9]:
import pyspark.sql.functions as F

In [10]:
out_frame.filter(F.col('matflag') == 'M').head(5)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1', code='CA', state='CALIFORNIA'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007', visatype='B1', code='NV', state='NEVADA'),
 Row(cicid=5748519.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574

In [11]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, DoubleType
import datetime
from datetime import datetime as dtt
def format_ts(value):
    """
    This is an udf function which reads the value which is unix epoch time and convert that into timestamp in the defined format.
    It returns a string as output.
    """
    epoch = dtt(1960, 1, 1)
    if value:
        ts=(epoch + datetime.timedelta(days=int(value))).strftime("%Y-%m-%d")
        return ts
    return None

def days_of_stay(d1,d2):
    if d1 and d2:
        return (dtt.strptime(d1, "%Y-%m-%d") - dtt.strptime(d2, "%Y-%m-%d")).days
    else:
        return -99999

In [12]:
get_timestamp = udf(lambda x: format_ts(x))
get_daysofstay = udf(days_of_stay, IntegerType())
imm_data = out_frame.withColumn("arrdate_1", get_timestamp(out_frame.arrdate))
imm_data = imm_data.withColumn("depdate_1", get_timestamp(imm_data.depdate))
imm_data = imm_data.withColumn("days_of_stay", get_daysofstay(imm_data.depdate_1, imm_data.arrdate_1))

In [13]:
imm_data.show(5)

+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----+----------+----------+----------+------------+
|    cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|        admnum|fltno|visatype|code|     state| arrdate_1| depdate_1|days_of_stay|
+---------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+--------------+-----+--------+----+----------+----------+----------+------------+
|5748517.0|2016.0|   4.0| 245.0| 438.0|    LOS|20574.0|    1.0|     CA|20582.0|  40.0|    1.0|  1.0|20160430|     SYD| null|      G|      O|   null|      M| 1976.0|10292

In [14]:
imm_data=imm_data.filter(F.col('cicid').isNotNull())
dropDisDF=imm_data.dropDuplicates(["cicid"])

In [15]:
dropDisDF.count()

                                                                                

2215673

In [16]:
imm_data.count()

2215673

In [17]:
us_demog = spark.read.option("header","true").option("delimiter", ";").format("csv").csv("../data/us-cities-demographics.csv", inferSchema=True)
global_temp_states = spark.read.option("header","true").format("csv").csv("../data/GlobalLandTemperaturesByState.csv", inferSchema=True)

                                                                                

In [18]:
global_temp_us = global_temp_states.filter(F.col('Country') == "United States")

In [19]:
global_temp_us.printSchema()

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)



In [20]:
median_temperatures=global_temp_us.groupby("State").agg(F.percentile_approx("AverageTemperature", 0.5).alias("median_temperatures"))

In [21]:
median_temperatures=median_temperatures.cache()
mt_df=median_temperatures.toPandas()
mt_df.head()
mf_dt={}
for value in mt_df.iterrows():
    mf_dt[value[1][0]] = value[1][1]
mf_dt

{'Utah': 8.209,
 'Hawaii': 22.506,
 'Minnesota': 5.635,
 'Ohio': 10.389,
 'Arkansas': 15.797000000000002,
 'Oregon': 7.7810000000000015,
 'District Of Columbia': 11.783,
 'Georgia (State)': 17.441000000000006,
 'Texas': 18.378,
 'North Dakota': 5.39,
 'Pennsylvania': 9.079,
 'Connecticut': 8.809000000000003,
 'Nebraska': 9.318999999999999,
 'Vermont': 5.579,
 'Nevada': 9.26,
 'Washington': 7.4460000000000015,
 'Illinois': 11.373,
 'Oklahoma': 15.699000000000002,
 'Delaware': 11.501,
 'Alaska': -5.8039999999999985,
 'New Mexico': 11.614999999999998,
 'West Virginia': 10.947,
 'Missouri': 12.765999999999998,
 'Rhode Island': 8.758000000000001,
 'Montana': 5.122999999999999,
 'Michigan': 6.68,
 'Virginia': 12.707999999999998,
 'North Carolina': 14.637,
 'Wyoming': 4.86,
 'Kansas': 12.418,
 'New Jersey': 10.387,
 'Maryland': 11.9,
 'Alabama': 17.01,
 'Arizona': 15.088,
 'Iowa': 9.525,
 'Massachusetts': 7.524,
 'Kentucky': 13.041,
 'Louisiana': 19.313,
 'Mississippi': 17.575,
 'New Hampshir

In [22]:
city_mappings = spark.read.option("header","true").option("delimiter", ",").format("csv").csv("../data/city-mappings.csv", inferSchema=True)
city_mappings=city_mappings.cache()
cm_df=city_mappings.toPandas()
cm_df.head()

cf_dt={}
for value in cm_df.iterrows():
    cf_dt[value[1][0]] = value[1][1]
cf_dt

{'ALC': 'ALCAN',
 'ANC': 'ANCHORAGE',
 'BAR': 'BAKERAAF-BAKERISLAND',
 'DAC': 'DALTONSCACHE',
 'PIZ': 'DEWSTATIONPTLAYDEW',
 'DTH': 'DUTCHHARBOR',
 'EGL': 'EAGLE',
 'FRB': 'FAIRBANKS',
 'HOM': 'HOMER',
 'HYD': 'HYDER',
 'JUN': 'JUNEAU',
 '5KE': 'KETCHIKAN',
 'KET': 'KETCHIKAN',
 'MOS': 'MOSESPOINTINTERMEDIATE',
 'NIK': 'NIKISKI',
 'NOM': 'NOM',
 'PKC': 'POKERCREEK',
 'ORI': 'PORTLIONSSPB',
 'SKA': 'SKAGWAY',
 'SNP': 'ST.PAULISLAND',
 'TKI': 'TOKEEN',
 'WRA': 'WRANGELL',
 'HSV': 'MADISONCOUNTY-HUNTSVILLE',
 'MOB': 'MOBILE',
 'LIA': 'LITTLEROCK',
 'ROG': 'ROGERSARPT',
 'DOU': 'DOUGLAS',
 'LUK': 'LUKEVILLE',
 'MAP': 'MARIPOSAAZ',
 'NAC': 'NACO',
 'NOG': 'NOGALES',
 'PHO': 'PHOENIX',
 'POR': 'PORTAL',
 'SLU': 'SANLUIS',
 'SAS': 'SASABE',
 'TUC': 'TUCSON',
 'YUI': 'YUMA',
 'AND': 'ANDRADE',
 'BUR': 'BURBANK',
 'CAL': 'CALEXICO',
 'CAO': 'CAMPO',
 'FRE': 'FRESNO',
 'ICP': 'IMPERIALCOUNTY',
 'LNB': 'LONGBEACH',
 'LOS': 'LOSANGELES',
 'BFL': 'MEADOWSFIELD-BAKERSFIELD',
 'OAK': 'OAKLAND',
 'ONT

In [23]:
len((cm_df[cm_df['C_Code'] == 'ALC']['City']).values)

1

In [24]:
## Some null impute by median.
def impute_temperatures(temp,state):
    if temp:
        return temp
    return float(mt_df[mt_df['State'] == state]['median_temperatures'].values[0])

def get_cities(c_code):
    if len((cm_df[cm_df['C_Code'] == 'ALC']['City']).values) > 0:
        return (cm_df[cm_df['C_Code'] == c_code]['City'].values[0]).capitalize()
    return "UNKNW"

def apply_impute_temp_func(mapping_broadcasted):
    def f(temp,state):
        if temp:
            return temp
        return mapping_broadcasted.value.get(state)
    return F.udf(f)

In [25]:
#avg_temps = global_temp_us.groupBy(F.col("State")).avg("AverageTemperature")

In [26]:
b_mf_dt = spark.sparkContext.broadcast(mf_dt)
global_temp_us = global_temp_us.withColumn("AverageTemperature_1", apply_impute_temp_func(b_mf_dt)(F.col('AverageTemperature'), F.col('State')))

In [27]:
global_temp_us.head(25)

[Row(dt='1743-11-01', AverageTemperature=10.722000000000001, AverageTemperatureUncertainty=2.898, State='Alabama', Country='United States', AverageTemperature_1='10.722000000000001'),
 Row(dt='1743-12-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States', AverageTemperature_1='17.01'),
 Row(dt='1744-01-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States', AverageTemperature_1='17.01'),
 Row(dt='1744-02-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States', AverageTemperature_1='17.01'),
 Row(dt='1744-03-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States', AverageTemperature_1='17.01'),
 Row(dt='1744-04-01', AverageTemperature=19.075, AverageTemperatureUncertainty=2.902, State='Alabama', Country='United States', AverageTemperature_1='19.075'),
 Row(dt='1744-05-01', AverageTem

In [28]:
impute_temperatures = udf(impute_temperatures, DoubleType())
global_temp_us = global_temp_us.withColumn("AverageTemperature_1", impute_temperatures(global_temp_us.AverageTemperature, global_temp_us.State))

In [29]:
global_temp_us.head(5)

[Row(dt='1743-11-01', AverageTemperature=10.722000000000001, AverageTemperatureUncertainty=2.898, State='Alabama', Country='United States', AverageTemperature_1=10.722000000000001),
 Row(dt='1743-12-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States', AverageTemperature_1=17.01),
 Row(dt='1744-01-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States', AverageTemperature_1=17.01),
 Row(dt='1744-02-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States', AverageTemperature_1=17.01),
 Row(dt='1744-03-01', AverageTemperature=None, AverageTemperatureUncertainty=None, State='Alabama', Country='United States', AverageTemperature_1=17.01)]

In [30]:
us_demog.head()

Row(City='Silver Spring', State='Maryland', Median Age=33.8, Male Population=40601, Female Population=41862, Total Population=82463, Number of Veterans=1562, Foreign-born=30908, Average Household Size=2.6, State Code='MD', Race='Hispanic or Latino', Count=25924)

In [31]:
# Drop Nulls from State column
global_temp_us=global_temp_us.filter(F.col('State').isNotNull())
us_demog=us_demog.filter(F.col('State').isNotNull())

In [32]:
# Extract YYYY MM DD Columns Separately.
global_temp_us = global_temp_us.withColumn("year", F.year("dt")) \
             .withColumn("month", F.month("dt")) \
             .withColumn("dayofmonth", F.dayofmonth("dt")) \
             .withColumn("weekofyear", F.weekofyear("dt")) \
             .withColumn("weekday", F.dayofweek("dt"))

imm_data = imm_data.withColumn("year", F.year("arrdate_1")) \
             .withColumn("month", F.month("arrdate_1")) \
             .withColumn("dayofmonth", F.dayofmonth("arrdate_1")) \
             .withColumn("weekofyear", F.weekofyear("arrdate_1")) \
             .withColumn("weekday", F.dayofweek("arrdate_1"))
get_city = udf(lambda x: get_cities(x))
imm_data = imm_data.withColumn("adj_city", get_city(imm_data.i94port))

In [33]:
imm_data.head(5)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1', code='CA', state='CALIFORNIA', arrdate_1='2016-04-30', depdate_1='2016-05-08', days_of_stay=8, year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7, adj_city='Losangeles'),
 Row(cicid=5748518.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='NV', depdate=20591.0, i94bir=32.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1984.0, dtaddto='10292016', gender='F', insnum=None, airline='VA', admnum=94955622830.0, fltno='00007',

In [34]:
# Create temp view for each of the staging data to create SQL Statements of them.

In [35]:
imm_data.printSchema()
imm_data.count()
imm_data.createOrReplaceTempView("imm_data")

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

In [36]:
#imm_data.filter((F.col('arrdate_1') > '2013-01-01') & (F.col('arrdate_1') < '2017-12-31')).count()

In [37]:
global_temp_us.printSchema()
global_temp_us.createOrReplaceTempView("global_temp_us")

root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- AverageTemperature_1: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [38]:
us_demog = us_demog.withColumnRenamed("Male Population", "male_pop") \
                   .withColumnRenamed("Female Population", "female_pop") \
                   .withColumnRenamed("Total Population", "total_pop") \
                   .withColumnRenamed("State Code", "state_code")
capitalize_city = udf(lambda x: x.lower().replace(" ", "").capitalize())
us_demog = us_demog.withColumn("city_1", capitalize_city(us_demog.City))
us_demog.printSchema()
us_demog.createOrReplaceTempView("us_demog")

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Median Age: double (nullable = true)
 |-- male_pop: integer (nullable = true)
 |-- female_pop: integer (nullable = true)
 |-- total_pop: integer (nullable = true)
 |-- Number of Veterans: integer (nullable = true)
 |-- Foreign-born: integer (nullable = true)
 |-- Average Household Size: double (nullable = true)
 |-- state_code: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Count: integer (nullable = true)
 |-- city_1: string (nullable = true)



In [39]:
flight_details = spark.read.option("header","true").option("delimiter", ",").format("csv").csv("../data/airport-codes_csv.csv", inferSchema=True)

In [40]:
flight_details.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)



In [41]:
state_code_udf = udf(lambda x: x.split("-")[1])

In [42]:
flight_details = flight_details.withColumn("state_code", state_code_udf(flight_details.iso_region))

In [43]:
flight_details.head(2)

[Row(ident='00A', type='heliport', name='Total Rf Heliport', elevation_ft=11, continent='NA', iso_country='US', iso_region='US-PA', municipality='Bensalem', gps_code='00A', iata_code=None, local_code='00A', coordinates='-74.93360137939453, 40.07080078125', state_code='PA'),
 Row(ident='00AA', type='small_airport', name='Aero B Ranch Airport', elevation_ft=3435, continent='NA', iso_country='US', iso_region='US-KS', municipality='Leoti', gps_code='00AA', iata_code=None, local_code='00AA', coordinates='-101.473911, 38.704022', state_code='KS')]

In [44]:
flight_details.createOrReplaceTempView("flight_details")
joined_flight_details = spark.sql("""
SELECT * FROM flight_details as fd LEFT JOIN states_mapping sm ON fd.state_code = sm.code
""")
joined_flight_details.printSchema()

root
 |-- ident: string (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)
 |-- elevation_ft: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- code: string (nullable = true)
 |-- state: string (nullable = true)



In [45]:
joined_flight_details.filter(F.col('name').isNull()).count()
joined_flight_details.createOrReplaceTempView("flight_info")
joined_flight_details.count()

55075

In [46]:
grouped_flight_stats = joined_flight_details.groupby(F.col("state")).agg(F.count("state").alias("count_of_flights"))

In [47]:
grouped_flight_stats.head(5)

[Row(state='NEW JERSEY', count_of_flights=455),
 Row(state='PENNSYLVANIA', count_of_flights=1344),
 Row(state='ILLINOIS', count_of_flights=903),
 Row(state='MARYLAND', count_of_flights=257),
 Row(state='DIST. OF COLUMBIA', count_of_flights=21)]

In [48]:
arrival_info = spark.sql(
"""
SELECT cicid, arrdate_1 as arrival_dt, depdate_1 as dep_dt, year, month, dayofmonth, weekofyear, weekday, days_of_stay, visatype, state, gender, biryear as birthyear, i94visa as visa_type
FROM imm_data
"""
)

In [49]:
arrival_info.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- arrival_dt: string (nullable = true)
 |-- dep_dt: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- weekday: integer (nullable = true)
 |-- days_of_stay: integer (nullable = true)
 |-- visatype: string (nullable = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthyear: double (nullable = true)
 |-- visa_type: double (nullable = true)



In [50]:
arrival_info.head(5)

[Row(cicid=5748517.0, arrival_dt='2016-04-30', dep_dt='2016-05-08', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7, days_of_stay=8, visatype='B1', state='CALIFORNIA', gender='F', birthyear=1976.0, visa_type=1.0),
 Row(cicid=5748518.0, arrival_dt='2016-04-30', dep_dt='2016-05-17', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7, days_of_stay=17, visatype='B1', state='NEVADA', gender='F', birthyear=1984.0, visa_type=1.0),
 Row(cicid=5748519.0, arrival_dt='2016-04-30', dep_dt='2016-05-08', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7, days_of_stay=8, visatype='B1', state='WASHINGTON', gender='M', birthyear=1987.0, visa_type=1.0),
 Row(cicid=5748520.0, arrival_dt='2016-04-30', dep_dt='2016-05-14', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7, days_of_stay=14, visatype='B1', state='WASHINGTON', gender='F', birthyear=1987.0, visa_type=1.0),
 Row(cicid=5748521.0, arrival_dt='2016-04-30', dep_dt='2016-05-14', year=2016, month=4, dayofmonth

In [51]:
flight_info = spark.sql("""
SELECT ident as airport_ident, name as airport_name, iata_code, state from  flight_info
""")
flight_info.printSchema()
flight_info.head(5)

root
 |-- airport_ident: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- state: string (nullable = true)



[Row(airport_ident='00A', airport_name='Total Rf Heliport', iata_code=None, state='PENNSYLVANIA'),
 Row(airport_ident='00AA', airport_name='Aero B Ranch Airport', iata_code=None, state='KANSAS'),
 Row(airport_ident='00AK', airport_name='Lowell Field', iata_code=None, state='ALASKA'),
 Row(airport_ident='00AL', airport_name='Epps Airpark', iata_code=None, state='ALABAMA'),
 Row(airport_ident='00AR', airport_name='Newport Hospital & Clinic Heliport', iata_code=None, state='ARKANSAS')]

In [52]:
# Discard
arrival_times = spark.sql("""
SELECT arrdate_1 as date, year, month, dayofmonth, weekofyear, weekday FROM imm_data
""")
arrival_times.printSchema()
arrival_times.head(5)

root
 |-- date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- weekday: integer (nullable = true)



[Row(date='2016-04-30', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7),
 Row(date='2016-04-30', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7),
 Row(date='2016-04-30', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7),
 Row(date='2016-04-30', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7),
 Row(date='2016-04-30', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7)]

In [53]:
global_temperatures = spark.sql(
"""
SELECT dt as date, AverageTemperature_1 as average_temp, state from global_temp_us
""")
global_temperatures.printSchema()
global_temperatures.head(5)

root
 |-- date: string (nullable = true)
 |-- average_temp: double (nullable = true)
 |-- state: string (nullable = true)



[Row(date='1743-11-01', average_temp=10.722000000000001, state='Alabama'),
 Row(date='1743-12-01', average_temp=17.01, state='Alabama'),
 Row(date='1744-01-01', average_temp=17.01, state='Alabama'),
 Row(date='1744-02-01', average_temp=17.01, state='Alabama'),
 Row(date='1744-03-01', average_temp=17.01, state='Alabama')]

In [54]:
demographics_us = spark.sql(
"""
SELECT row_number() OVER (PARTITION BY '' ORDER BY '') as demog_id, city_1 as city, State as state, male_pop, female_pop, total_pop, Race as race
FROM us_demog
""")

In [55]:
demographics_us.printSchema()
demographics_us.head(5)

root
 |-- demog_id: integer (nullable = false)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- male_pop: integer (nullable = true)
 |-- female_pop: integer (nullable = true)
 |-- total_pop: integer (nullable = true)
 |-- race: string (nullable = true)



[Row(demog_id=1, city='Silverspring', state='Maryland', male_pop=40601, female_pop=41862, total_pop=82463, race='Hispanic or Latino'),
 Row(demog_id=2, city='Quincy', state='Massachusetts', male_pop=44129, female_pop=49500, total_pop=93629, race='White'),
 Row(demog_id=3, city='Hoover', state='Alabama', male_pop=38040, female_pop=46799, total_pop=84839, race='Asian'),
 Row(demog_id=4, city='Ranchocucamonga', state='California', male_pop=88127, female_pop=87105, total_pop=175232, race='Black or African-American'),
 Row(demog_id=5, city='Newark', state='New Jersey', male_pop=138040, female_pop=143873, total_pop=281913, race='White')]

In [56]:
city_list_demog = demographics_us.groupby("city").count()

In [57]:
stats_us_demog = demographics_us.groupBy(F.col("state")).agg(F.sum("male_pop").alias("total_male_pop"), F.avg("male_pop").alias("avg_male_pop"),F.sum("female_pop").alias("total_female_pop"), F.avg("female_pop").alias("avg_female_pop") )
stats_us_demog.createOrReplaceTempView("stats_us_demog")
stats_us_demog.show()
stats_us_demog.count()

+--------------------+--------------+------------------+----------------+------------------+
|               state|total_male_pop|      avg_male_pop|total_female_pop|    avg_female_pop|
+--------------------+--------------+------------------+----------------+------------------+
|                Utah|       2586752|53890.666666666664|         2532925|52769.270833333336|
|              Hawaii|        884035|          176807.0|          879795|          175959.0|
|           Minnesota|       3478803| 64422.27777777778|         3565362| 66025.22222222222|
|                Ohio|       5853254|119454.16326530612|         6243296|127414.20408163265|
|            Arkansas|       1400724|  48300.8275862069|         1482165|51109.137931034486|
|              Oregon|       3537215|         88430.375|         3645330|          91133.25|
|               Texas|      34862194|127700.34432234432|        35691659|130738.67765567766|
|        North Dakota|        476175|           47617.5|          4712

49

In [58]:
# Get the arrival information from immigration data.
arrival_info = spark.sql("""
    SELECT cicid, arrdate_1 as arrival_dt, depdate_1 as dep_dt, year, month, dayofmonth, weekofyear, weekday, days_of_stay, visatype, state, i94addr as state_code, adj_city as city, gender, biryear as birthyear,
    i94visa as visa_category
    FROM imm_data
    """
)
capitalize_state = udf(lambda x: x.lower().capitalize() if x else None)
arrival_info = arrival_info.withColumn("state_1", capitalize_state(arrival_info.state))
arrival_info.createOrReplaceTempView("arrival_info")
arrival_info.head(5)

[Row(cicid=5748517.0, arrival_dt='2016-04-30', dep_dt='2016-05-08', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7, days_of_stay=8, visatype='B1', state='CALIFORNIA', state_code='CA', city='Losangeles', gender='F', birthyear=1976.0, visa_category=1.0, state_1='California'),
 Row(cicid=5748518.0, arrival_dt='2016-04-30', dep_dt='2016-05-17', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7, days_of_stay=17, visatype='B1', state='NEVADA', state_code='NV', city='Losangeles', gender='F', birthyear=1984.0, visa_category=1.0, state_1='Nevada'),
 Row(cicid=5748519.0, arrival_dt='2016-04-30', dep_dt='2016-05-08', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7, days_of_stay=8, visatype='B1', state='WASHINGTON', state_code='WA', city='Losangeles', gender='M', birthyear=1987.0, visa_category=1.0, state_1='Washington'),
 Row(cicid=5748520.0, arrival_dt='2016-04-30', dep_dt='2016-05-14', year=2016, month=4, dayofmonth=30, weekofyear=17, weekday=7, days_of_stay=1

In [59]:
# Get the arrival information from immigration data.
visa_info = spark.sql("""
    SELECT cicid as arrival_id, state,visatype,i94visa as visa_category
    FROM imm_data
    """
)
capitalize_state = udf(lambda x: x.lower().capitalize() if x else None)
visa_info = visa_info.withColumn("state_1", capitalize_state(visa_info.state))
visa_info.createOrReplaceTempView("visa_info")
visa_info.head(5)

[Row(arrival_id=5748517.0, state='CALIFORNIA', visatype='B1', visa_category=1.0, state_1='California'),
 Row(arrival_id=5748518.0, state='NEVADA', visatype='B1', visa_category=1.0, state_1='Nevada'),
 Row(arrival_id=5748519.0, state='WASHINGTON', visatype='B1', visa_category=1.0, state_1='Washington'),
 Row(arrival_id=5748520.0, state='WASHINGTON', visatype='B1', visa_category=1.0, state_1='Washington'),
 Row(arrival_id=5748521.0, state='WASHINGTON', visatype='B1', visa_category=1.0, state_1='Washington')]

In [60]:
# Get the airport information (This is an independent table can is having no relationship to the fact and dimensions)
airport_info = spark.sql("""
SELECT ident as airport_ident, name as airport_name, iata_code, state, state_code from  flight_info
""")
airport_info = airport_info.withColumn("state", capitalize_state(airport_info.state))
airport_info.head(5)

[Row(airport_ident='00A', airport_name='Total Rf Heliport', iata_code=None, state='Pennsylvania', state_code='PA'),
 Row(airport_ident='00AA', airport_name='Aero B Ranch Airport', iata_code=None, state='Kansas', state_code='KS'),
 Row(airport_ident='00AK', airport_name='Lowell Field', iata_code=None, state='Alaska', state_code='AK'),
 Row(airport_ident='00AL', airport_name='Epps Airpark', iata_code=None, state='Alabama', state_code='AL'),
 Row(airport_ident='00AR', airport_name='Newport Hospital & Clinic Heliport', iata_code=None, state='Arkansas', state_code='AR')]

In [61]:
# Global temperatures by date and state in USA. Unfortunately the dataset spans only until 2013, but immigration data is of 2016. Therefore, a simulation of dataset is created to extend the global temperatures.

global_temperatures = spark.sql("""
SELECT dt as date, AverageTemperature_1 as average_temp, state from global_temp_us
""")
global_temperatures.filter(F.col("date") > "2016-01-01").show()


+----+------------+-----+
|date|average_temp|state|
+----+------------+-----+
+----+------------+-----+



In [62]:
import numpy
from datetime import datetime
from dateutil.relativedelta import relativedelta
median_state_mapping=mt_df.to_numpy()
start_date = "2014-01-01"
end_date = "2016-12-31"
days = int((datetime.strptime(end_date, '%Y-%m-%d') - datetime.strptime(start_date, '%Y-%m-%d')).days)
simulate_mappings = {"date":[],"average_temp": [], "state":[]}
for state_mapping in median_state_mapping:
    for _ in range(days):
        if state_mapping[1] < 0:
            random_temp = numpy.random.uniform(low=state_mapping[1],high=10)
        else:
            random_temp = numpy.random.uniform(low=0,high=state_mapping[1])
        simulate_mappings["date"].append(start_date)
        simulate_mappings["average_temp"].append(random_temp)
        simulate_mappings["state"].append(state_mapping[0])
        temp_date = str(datetime.strptime(start_date, '%Y-%m-%d') + relativedelta(days=1)).split(" ")[0]
        start_date = temp_date
        #print(start_date)
simulate_df=pd.DataFrame.from_dict(simulate_mappings)
simulate_df.to_csv("../data/simulate_temp.csv")

In [63]:
simulate_data = spark.read.option("header","true").option("delimiter", ",").format("csv").csv("../data/simulate_temp.csv", inferSchema=True)
simulate_data = simulate_data.select(["date", "average_temp", "state"])
global_temperatures = global_temperatures.union(simulate_data)
global_temperatures = global_temperatures.withColumn("state_1", capitalize_state(global_temperatures.state))
global_temperatures.createOrReplaceTempView("global_temperatures")
global_temperatures.head(5)

[Row(date='1743-11-01', average_temp=10.722000000000001, state='Alabama', state_1='Alabama'),
 Row(date='1743-12-01', average_temp=17.01, state='Alabama', state_1='Alabama'),
 Row(date='1744-01-01', average_temp=17.01, state='Alabama', state_1='Alabama'),
 Row(date='1744-02-01', average_temp=17.01, state='Alabama', state_1='Alabama'),
 Row(date='1744-03-01', average_temp=17.01, state='Alabama', state_1='Alabama')]

In [64]:
#arrival_info.withColumnRenamed("arrival_dt", "date").join(global_temperatures, on=["date","state_1"]).select(["cicid", "date", "average_temp"]).head(5)

In [65]:
# Demographics Data information by State and City in USA.
demographics_us = spark.sql("""
SELECT row_number() OVER (PARTITION BY '' ORDER BY '') as demog_id, city_1 as city, State as state, male_pop, female_pop, total_pop, Race as race
FROM us_demog
""")
demographics_us = demographics_us.withColumn("state", capitalize_city(demographics_us.state))
demographics_us.createOrReplaceTempView("demographics_us")
demographics_us.head(5)

[Row(demog_id=1, city='Silverspring', state='Maryland', male_pop=40601, female_pop=41862, total_pop=82463, race='Hispanic or Latino'),
 Row(demog_id=2, city='Quincy', state='Massachusetts', male_pop=44129, female_pop=49500, total_pop=93629, race='White'),
 Row(demog_id=3, city='Hoover', state='Alabama', male_pop=38040, female_pop=46799, total_pop=84839, race='Asian'),
 Row(demog_id=4, city='Ranchocucamonga', state='California', male_pop=88127, female_pop=87105, total_pop=175232, race='Black or African-American'),
 Row(demog_id=5, city='Newark', state='Newjersey', male_pop=138040, female_pop=143873, total_pop=281913, race='White')]

In [72]:
# Fact Table combining information from dimensions: arrival, global_temperature, demographics_us
imm_dynamics_us = spark.sql("""
SELECT row_number() OVER (PARTITION BY '' ORDER BY '') as imm_dyn_id, ai.cicid, ai.arrival_dt, ai.state_1 as state, vi.visa_category, ai.arrival_dt, gt.average_temp, sm.code
FROM arrival_info ai Left JOIN global_temperatures gt ON ai.arrival_dt = gt.date AND ai.state_1 = gt.state JOIN visa_info vi ON vi.arrival_id = ai.cicid LEFT JOIN states_mapping sm ON sm.code = ai.state_code
""")

In [73]:
imm_dynamics_us.head(2)

                                                                                

[Row(imm_dyn_id=1, cicid=7.0, arrival_dt='2016-04-07', state='Alabama', visa_category=3.0, arrival_dt='2016-04-07', average_temp=None, code='AL'),
 Row(imm_dyn_id=2, cicid=67.0, arrival_dt='2016-04-01', state='Alabama', visa_category=2.0, arrival_dt='2016-04-01', average_temp=None, code='AL')]

In [74]:
imm_dynamics_us.count()

                                                                                

2215673