In [1]:
# imports
import os
import configparser
import datetime
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql.functions import udf

### Read config file

In [2]:
config = configparser.ConfigParser()
config.read('config.cfg')

#input_dir
input_dir = config.get('INPUT', 'INPUT_DIR')

# staging_prefix
staging_dir = config.get('OUTPUT', 'STAGING_PREFIX')
analytics_dir = config.get('OUTPUT', 'ANALYTICS_PREFIX')

# set AWS env variable
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS', 'AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')

# raw data path
raw_data_path = config.get('INPUT', 'RAW_DATA_PATH')

In [3]:
states_path = os.path.join(input_dir, 'us_states.csv')
cities_path = os.path.join(input_dir, 'us_cities.csv')
temperature_path = os.path.join(input_dir, 'us_temperature.csv')
airports_path = os.path.join(input_dir, 'us_interantional_airport_codes.csv')
visa_path = os.path.join(input_dir, 'visa_category.csv')
mode_path = os.path.join(input_dir, 'travel_mode.csv')
country_path = os.path.join(input_dir, 'country_code.csv')

### Create spark session

In [4]:
def create_spark():
    spark = SparkSession.builder.\
    config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.0")\
    .enableHiveSupport().getOrCreate()
    return spark

In [5]:
spark = create_spark()

In [6]:
if 's3' in raw_data_path:
    df_spark = spark.read.parquet(raw_data_path)
else:
    df_spark = spark.read.format('com.github.saurfang.sas.spark').load(raw_data_path)

In [7]:
df_spark.count()

3096313

### Staging Table 1

In [8]:
df_spark.createOrReplaceTempView('immigrations_raw')

In [9]:
staging_1 = spark.sql("""
        SELECT cicid, i94yr, i94mon, i94cit, i94port, arrdate, i94mode, i94addr, i94bir,
               i94visa, count, occup, gender, visatype
        FROM immigrations_raw
""")

In [10]:
staging_1.limit(10).toPandas()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94port,arrdate,i94mode,i94addr,i94bir,i94visa,count,occup,gender,visatype
0,5748517.0,2016.0,4.0,245.0,LOS,20574.0,1.0,CA,40.0,1.0,1.0,,F,B1
1,5748518.0,2016.0,4.0,245.0,LOS,20574.0,1.0,NV,32.0,1.0,1.0,,F,B1
2,5748519.0,2016.0,4.0,245.0,LOS,20574.0,1.0,WA,29.0,1.0,1.0,,M,B1
3,5748520.0,2016.0,4.0,245.0,LOS,20574.0,1.0,WA,29.0,1.0,1.0,,F,B1
4,5748521.0,2016.0,4.0,245.0,LOS,20574.0,1.0,WA,28.0,1.0,1.0,,M,B1
5,5748522.0,2016.0,4.0,245.0,HHW,20574.0,1.0,HI,57.0,2.0,1.0,,M,B2
6,5748523.0,2016.0,4.0,245.0,HHW,20574.0,1.0,HI,66.0,2.0,1.0,,F,B2
7,5748524.0,2016.0,4.0,245.0,HHW,20574.0,1.0,HI,41.0,2.0,1.0,,F,B2
8,5748525.0,2016.0,4.0,245.0,HOU,20574.0,1.0,FL,27.0,2.0,1.0,,M,B2
9,5748526.0,2016.0,4.0,245.0,LOS,20574.0,1.0,CA,26.0,2.0,1.0,,F,B2


In [11]:
staging_1.createOrReplaceTempView('staging_1')

In [12]:
spark.sql("""
            SELECT COUNT(*) FROM staging_1
            WHERE occup IS NOT NULL
""").show()

+--------+
|count(1)|
+--------+
|    8126|
+--------+



### Save staging_1 as parquet on s3

In [12]:
staging_1_path = staging_dir + 'staging_1/'

In [14]:
staging_1.write.mode('overwrite').parquet(staging_1_path)

### Fact table

In [18]:
# staging_1
# staging_5
airports = spark.read.csv(airports_path, header=True)
staging_1.createOrReplaceTempView('staging_1')
airports.createOrReplaceTempView('staging_5')

In [19]:
staging_1.show(10)

+---------+------+------+------+-------+-------+-------+-------+------+-------+-----+-----+------+--------+
|    cicid| i94yr|i94mon|i94cit|i94port|arrdate|i94mode|i94addr|i94bir|i94visa|count|occup|gender|visatype|
+---------+------+------+------+-------+-------+-------+-------+------+-------+-----+-----+------+--------+
|5748517.0|2016.0|   4.0| 245.0|    LOS|20574.0|    1.0|     CA|  40.0|    1.0|  1.0| null|     F|      B1|
|5748518.0|2016.0|   4.0| 245.0|    LOS|20574.0|    1.0|     NV|  32.0|    1.0|  1.0| null|     F|      B1|
|5748519.0|2016.0|   4.0| 245.0|    LOS|20574.0|    1.0|     WA|  29.0|    1.0|  1.0| null|     M|      B1|
|5748520.0|2016.0|   4.0| 245.0|    LOS|20574.0|    1.0|     WA|  29.0|    1.0|  1.0| null|     F|      B1|
|5748521.0|2016.0|   4.0| 245.0|    LOS|20574.0|    1.0|     WA|  28.0|    1.0|  1.0| null|     M|      B1|
|5748522.0|2016.0|   4.0| 245.0|    HHW|20574.0|    1.0|     HI|  57.0|    2.0|  1.0| null|     M|      B2|
|5748523.0|2016.0|   4.0| 24

In [20]:
airports.limit(10).toPandas()

Unnamed: 0,airport_id,city_code,city,state_code,type,name,elevation_ft,gps_code,iata_code,local_code,latitude,longitude
0,PANC,ANC,anchorage,AK,large_airport,Ted Stevens Anchorage International Airport,152.0,PANC,ANC,ANC,61.17440032958984,-149.99600219726562
1,PAFA,FRB,fairbanks,AK,large_airport,Fairbanks International Airport,439.0,PAFA,FAI,FAI,64.81510162,-147.8560028
2,PAJN,JUN,juneau,AK,medium_airport,Juneau International Airport,21.0,PAJN,JNU,JNU,58.35499954223633,-134.5760040283203
3,PAKT,5KE,ketchikan,AK,medium_airport,Ketchikan International Airport,89.0,PAKT,KTN,KTN,55.35559845,-131.7140045
4,PAKT,KET,ketchikan,AK,medium_airport,Ketchikan International Airport,89.0,PAKT,KTN,KTN,55.35559845,-131.7140045
5,KOLS,NOG,nogales,AZ,medium_airport,Nogales International Airport,3955.0,KOLS,OLS,OLS,31.4177,-110.848
6,KPHX,PHO,phoenix,AZ,large_airport,Phoenix Sky Harbor International Airport,1135.0,KPHX,PHX,PHX,33.43429946899414,-112.01200103759766
7,KTUS,TUC,tucson,AZ,large_airport,Tucson International Airport,2643.0,KTUS,TUS,TUS,32.1161003112793,-110.94100189208984
8,KNYL,YUI,yuma,AZ,medium_airport,Yuma MCAS/Yuma International Airport,213.0,KNYL,YUM,NYL,32.65660095,-114.6060028
9,KNYL,YUM,yuma,AZ,medium_airport,Yuma MCAS/Yuma International Airport,213.0,KNYL,YUM,NYL,32.65660095,-114.6060028


In [15]:
sas_to_dt = udf(lambda x: (datetime.datetime(1960, 1, 1).date() + datetime.timedelta(x)).isoformat() if x else None)

In [16]:
spark.udf.register("SAS_TO_DT", sas_to_dt)

<function __main__.<lambda>(x)>

In [23]:
immigrations = spark.sql("""
    SELECT DISTINCT CAST(staging_1.cicid AS INTEGER) AS id,
           CAST(staging_1.i94yr AS INTEGER) AS year,
           CAST(staging_1.i94mon AS INTEGER) AS month,
           CAST(staging_1.i94cit AS INTEGER) AS country_code,
           staging_5.airport_id AS airport_id,
           staging_1.i94addr AS state_code,
           staging_1.count AS count,
           staging_1.i94port AS city_code,
           SAS_TO_DT(staging_1.arrdate) AS arrival_date,
           CAST(staging_1.cicid AS INTEGER) AS immigrant_id
    FROM staging_1 JOIN
    staging_5 ON (staging_1.i94port == staging_5.city_code 
                  AND
                  staging_1.i94addr == staging_5.state_code)
""")

In [24]:
immigrations.limit(10).toPandas()

Unnamed: 0,id,year,month,country_code,airport_id,state_code,count,city_code,arrival_date,immigrant_id
0,5749838,2016,4,251,KSFO,CA,1.0,SFR,2016-04-30,5749838
1,5749988,2016,4,253,KLAX,CA,1.0,LOS,2016-04-30,5749988
2,5749997,2016,4,253,KSFO,CA,1.0,SFR,2016-04-30,5749997
3,5750249,2016,4,254,KJFK,NY,1.0,NYC,2016-04-30,5750249
4,5751150,2016,4,254,PHNL,HI,1.0,HHW,2016-04-30,5751150
5,5751200,2016,4,254,PHNL,HI,1.0,HHW,2016-04-30,5751200
6,5751321,2016,4,254,PHNL,HI,1.0,HHW,2016-04-30,5751321
7,5751768,2016,4,254,PHNL,HI,1.0,HHW,2016-04-30,5751768
8,5751870,2016,4,254,PHNL,HI,1.0,HHW,2016-04-30,5751870
9,5752990,2016,4,254,KLAX,CA,1.0,LOS,2016-04-30,5752990


### Partition By year, month, state_code

In [25]:
immigrations_path = analytics_dir + 'immigrations/'

In [30]:
immigrations.write.partitionBy('year', 'month', 'state_code').parquet(immigrations_path)

### Immigrants Dimension Table

In [31]:
mode = spark.read.csv(mode_path, header=True)
visa = spark.read.csv(visa_path, header=True)
country = spark.read.csv(country_path, header=True)

In [32]:
mode.createOrReplaceTempView('staging_7')
visa.createOrReplaceTempView('staging_6')
country.createOrReplaceTempView('staging_8')

In [33]:
immigrants = spark.sql("""
                SELECT DISTINCT CAST(staging_1.cicid AS INTEGER) AS immigrant_id,
                       staging_1.gender AS gender,
                       staging_1.visatype AS visa_type,
                       staging_1.occup AS occupation,
                       staging_7.mode As mode,
                       staging_6.category AS visa_category,
                       staging_8.country AS country
                FROM staging_1 JOIN
                     staging_7 ON (staging_1.i94mode == staging_7.id) JOIN
                     staging_6 ON (staging_1.i94visa == staging_6.id) JOIN
                     staging_8 ON (staging_1.i94cit == staging_8.id)
""")

In [34]:
immigrants.limit(10).toPandas()

Unnamed: 0,immigrant_id,gender,visa_type,occupation,mode,visa_category,country
0,1280,F,WT,,Air,Pleasure,Belgium
1,1359,F,WT,,Air,Pleasure,Belgium
2,1616,F,WT,,Air,Pleasure,Belgium
3,1957,M,WT,,Air,Pleasure,Belgium
4,2733,M,B2,,Air,Pleasure,Poland
5,2777,M,B2,,Air,Pleasure,Poland
6,2928,,WB,,Air,Business,Denmark
7,3321,M,WT,,Air,Pleasure,Denmark
8,3415,,WB,,Air,Business,Denmark
9,3936,F,WT,,Air,Pleasure,Finland


### Save immigrants dimension table on s3

In [35]:
immigrants_path = analytics_dir + 'immigrants/'

In [38]:
immigrants.write.parquet(immigrants_path)

### States dimension table

In [39]:
states = spark.read.csv(states_path, header=True)
states.limit(10).toPandas()

Unnamed: 0,State Code,State,Total Population,Female Population,Number of Veterans,Foreign-born,num_households,avg_households,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,AK,Alaska,298695,145750.0,27492.0,33258.0,107832.0,2.77,0.1080784467682643,0.1095238945001606,0.0687241990554028,0.0810789107391413,0.6325945489370308
1,AL,Alabama,1049629,552381.0,71543.0,52154.0,443976.0,2.36,0.008593459567551,0.0262131799991738,0.4747766441589745,0.0358204576213119,0.4545962586529888
2,AR,Arkansas,589879,303400.0,31704.0,62108.0,238505.0,2.47,0.014476641851183,0.0408549629064022,0.2308731941234185,0.1200800482215227,0.5937151528974733
3,AZ,Arizona,4499542,2272087.0,264505.0,682313.0,1639723.0,2.74,0.0225387805586249,0.0398241075705996,0.0514731755530653,0.2620657143040837,0.6240982220136263
4,CA,California,24822460,12544179.0,928270.0,7448257.0,8330662.0,2.98,0.0132919133860024,0.1438276029663739,0.0647961911734619,0.3097199690559911,0.4683643234181706
5,CO,Colorado,2935669,1481050.0,187896.0,337631.0,1169734.0,2.51,0.0174551251099779,0.0414793743330237,0.0579978054598108,0.1961821914401781,0.6868855036570094
6,CT,Connecticut,885581,453424.0,24953.0,225866.0,332485.0,2.66,0.0110659227672542,0.0435995964635769,0.2092141676094334,0.2797608434298017,0.4563594697299336
7,DC,District of Columbia,672228,352523.0,25963.0,95117.0,300102.0,2.24,0.0084374944082673,0.048274029997839,0.4525497612588246,0.0979038400922756,0.3928348742427934
8,DE,Delaware,71957,39277.0,3063.0,3336.0,29370.0,2.45,0.0055164694595458,0.0158964929112034,0.5887165547383009,0.0734996269054471,0.3163708559855026
9,FL,Florida,6796738,3487375.0,388228.0,1688931.0,2514306.0,2.7,0.0078145487209146,0.032469542689192,0.1898821465101665,0.2231338898620714,0.5466998722176555


In [40]:
states.createOrReplaceTempView('staging_2')

In [41]:
states_dim = spark.sql("""
        SELECT `State Code` AS state_code,
               State AS state,
               `Total Population` AS population,
               `Female Population` AS female_population,
               `Number of Veterans` AS num_veterans,
               `Foreign-born` AS foreign_born,
               avg_households,
               `American Indian and Alaska Native` AS native,
               Asian AS asian,
               `Black or African-American` AS black,
               `Hispanic or Latino` AS hispanic,
               `White` AS white
        FROM staging_2
""")

In [42]:
states_dim.limit(10).toPandas()

Unnamed: 0,state_code,state,population,female_population,num_veterans,foreign_born,avg_households,native,asian,black,hispanic,white
0,AK,Alaska,298695,145750.0,27492.0,33258.0,2.77,0.1080784467682643,0.1095238945001606,0.0687241990554028,0.0810789107391413,0.6325945489370308
1,AL,Alabama,1049629,552381.0,71543.0,52154.0,2.36,0.008593459567551,0.0262131799991738,0.4747766441589745,0.0358204576213119,0.4545962586529888
2,AR,Arkansas,589879,303400.0,31704.0,62108.0,2.47,0.014476641851183,0.0408549629064022,0.2308731941234185,0.1200800482215227,0.5937151528974733
3,AZ,Arizona,4499542,2272087.0,264505.0,682313.0,2.74,0.0225387805586249,0.0398241075705996,0.0514731755530653,0.2620657143040837,0.6240982220136263
4,CA,California,24822460,12544179.0,928270.0,7448257.0,2.98,0.0132919133860024,0.1438276029663739,0.0647961911734619,0.3097199690559911,0.4683643234181706
5,CO,Colorado,2935669,1481050.0,187896.0,337631.0,2.51,0.0174551251099779,0.0414793743330237,0.0579978054598108,0.1961821914401781,0.6868855036570094
6,CT,Connecticut,885581,453424.0,24953.0,225866.0,2.66,0.0110659227672542,0.0435995964635769,0.2092141676094334,0.2797608434298017,0.4563594697299336
7,DC,District of Columbia,672228,352523.0,25963.0,95117.0,2.24,0.0084374944082673,0.048274029997839,0.4525497612588246,0.0979038400922756,0.3928348742427934
8,DE,Delaware,71957,39277.0,3063.0,3336.0,2.45,0.0055164694595458,0.0158964929112034,0.5887165547383009,0.0734996269054471,0.3163708559855026
9,FL,Florida,6796738,3487375.0,388228.0,1688931.0,2.7,0.0078145487209146,0.032469542689192,0.1898821465101665,0.2231338898620714,0.5466998722176555


### Save states dimension table on s3

In [44]:
dim_states_path = analytics_dir + 'states/'
states_dim.write.parquet(dim_states_path)

### Cities dimension table

In [45]:
cities = spark.read.csv(cities_path, header=True)
cities.limit(10).toPandas()

Unnamed: 0,code,city,state_code,State,Total Population,Female Population,Number of Veterans,Foreign-born,Average Household Size,num_households,American Indian and Alaska Native,Asian,Black or African-American,Hispanic or Latino,White
0,ANC,Anchorage,AK,Alaska,298695,145750.0,27492.0,33258.0,2.77,107832.0,0.1216592175965449,0.1232862953849244,0.0773598486750698,0.0912670115000251,0.7120842330805671
1,MOB,Mobile,AL,Alabama,194305,103030.0,11939.0,7234.0,2.4,80960.0,0.0144926790355369,0.0283986516044363,0.4961117830215383,0.0269112992460307,0.4825146033298165
2,LIA,Little rock,AR,Arkansas,197986,100989.0,12343.0,16640.0,2.36,83892.0,0.0048538785570696,0.0425434121604557,0.43238410796723,0.0782883638237047,0.5167638115826372
3,PHO,Phoenix,AZ,Arizona,1563001,776168.0,72388.0,300702.0,2.89,540831.0,0.0267101556556905,0.0424842978347422,0.0850536883853561,0.42860753128117,0.7430929346814238
4,TUC,Tucson,AZ,Arizona,531674,266781.0,38182.0,82220.0,2.45,217010.0,0.0459097115901849,0.0464363500942306,0.0637608760255344,0.4345237871327166,0.760507378581612
5,YUI,Yuma,AZ,Arizona,94145,45847.0,7182.0,19326.0,2.64,35661.0,0.0130437091720218,0.0125338573477083,0.0396303574273726,0.6060226246747039,0.7402517393382548
6,YUM,Yuma,AZ,Arizona,94145,45847.0,7182.0,19326.0,2.64,35661.0,0.0130437091720218,0.0125338573477083,0.0396303574273726,0.6060226246747039,0.7402517393382548
7,FRE,Fresno,CA,California,520072,263942.0,18410.0,103453.0,3.12,166690.0,0.0218815856266055,0.1448222553800243,0.0885877340060607,0.4925183436139611,0.626165223276777
8,LNB,Long beach,CA,California,474172,236013.0,17463.0,127764.0,2.78,170565.0,0.0270808904785605,0.1436082265507031,0.1369713943463553,0.4384274060889297,0.5862050057784939
9,LOS,Los angeles,CA,California,3971896,2012898.0,85417.0,1485425.0,2.86,1388775.0,0.0160522833427662,0.129157208547253,0.1019331825405297,0.4876089404153583,0.5482646071296933


In [46]:
cities.createOrReplaceTempView('staging_3')

In [47]:
cities_dim = spark.sql("""
            SELECT code AS city_code,
                   city,
                   state_code,
                   State AS state,
                   `Total Population` AS total_population,
                   `Female Population` AS female_population,
                   `Number of Veterans` AS num_veterans,
                   `Foreign-born` AS  foreign_born,
                   `Average Household Size` AS avg_households,
                   `American Indian and Alaska Native` AS native,
                   Asian AS asian,
                   `Black or African-American` AS black,
                   `Hispanic or Latino` AS hispanic,
                   `White` AS white
            FROM staging_3
""")

In [48]:
cities_dim.limit(10).toPandas()

Unnamed: 0,city_code,city,state_code,state,total_population,female_population,num_veterans,foreign_born,avg_households,native,asian,black,hispanic,white
0,ANC,Anchorage,AK,Alaska,298695,145750.0,27492.0,33258.0,2.77,0.1216592175965449,0.1232862953849244,0.0773598486750698,0.0912670115000251,0.7120842330805671
1,MOB,Mobile,AL,Alabama,194305,103030.0,11939.0,7234.0,2.4,0.0144926790355369,0.0283986516044363,0.4961117830215383,0.0269112992460307,0.4825146033298165
2,LIA,Little rock,AR,Arkansas,197986,100989.0,12343.0,16640.0,2.36,0.0048538785570696,0.0425434121604557,0.43238410796723,0.0782883638237047,0.5167638115826372
3,PHO,Phoenix,AZ,Arizona,1563001,776168.0,72388.0,300702.0,2.89,0.0267101556556905,0.0424842978347422,0.0850536883853561,0.42860753128117,0.7430929346814238
4,TUC,Tucson,AZ,Arizona,531674,266781.0,38182.0,82220.0,2.45,0.0459097115901849,0.0464363500942306,0.0637608760255344,0.4345237871327166,0.760507378581612
5,YUI,Yuma,AZ,Arizona,94145,45847.0,7182.0,19326.0,2.64,0.0130437091720218,0.0125338573477083,0.0396303574273726,0.6060226246747039,0.7402517393382548
6,YUM,Yuma,AZ,Arizona,94145,45847.0,7182.0,19326.0,2.64,0.0130437091720218,0.0125338573477083,0.0396303574273726,0.6060226246747039,0.7402517393382548
7,FRE,Fresno,CA,California,520072,263942.0,18410.0,103453.0,3.12,0.0218815856266055,0.1448222553800243,0.0885877340060607,0.4925183436139611,0.626165223276777
8,LNB,Long beach,CA,California,474172,236013.0,17463.0,127764.0,2.78,0.0270808904785605,0.1436082265507031,0.1369713943463553,0.4384274060889297,0.5862050057784939
9,LOS,Los angeles,CA,California,3971896,2012898.0,85417.0,1485425.0,2.86,0.0160522833427662,0.129157208547253,0.1019331825405297,0.4876089404153583,0.5482646071296933


### Save cities dimension table partitioned by state

In [50]:
dim_cities_path = analytics_dir + 'cities/'
cities_dim.write.partitionBy('state_code').parquet(dim_cities_path)

### Temperature Dimension Table

In [51]:
us_temp = spark.read.csv(temperature_path, header=True)
us_temp.limit(10).toPandas()

Unnamed: 0,city_code,dt,AverageTemperature,AverageTemperatureUncertainty,Latitude,Longitude
0,ANC,1900-01-01,-19.308,1.355,61.88N,151.13W
1,ANC,1900-02-01,-10.759,0.916,61.88N,151.13W
2,ANC,1900-03-01,-6.278,0.871,61.88N,151.13W
3,ANC,1900-04-01,-2.922,0.919,61.88N,151.13W
4,ANC,1900-05-01,3.9090000000000007,1.289,61.88N,151.13W
5,ANC,1900-06-01,9.354,0.483,61.88N,151.13W
6,ANC,1900-07-01,12.056,0.583,61.88N,151.13W
7,ANC,1900-08-01,9.473,1.012,61.88N,151.13W
8,ANC,1900-09-01,4.884,0.742,61.88N,151.13W
9,ANC,1900-10-01,-4.16,0.7,61.88N,151.13W


In [54]:
lat_as_double = udf(lambda x: np.float(x[:-1]) if 'N' in x else -1*np.float(x[:-1]))

long_as_double = udf(lambda x: np.float(x[:-1]) if 'E' in x else -1*np.float(x[:-1]))

spark.udf.register("LAT_AS_DOUBLE", lat_as_double)
spark.udf.register("LONG_AS_DOUBLE", long_as_double)

In [55]:
us_temp.createOrReplaceTempView('staging_4')

In [56]:
temp_dim = spark.sql("""
        SELECT city_code,
               dt AS date,
               YEAR(dt) AS year,
               MONTH(dt) AS month,
               AverageTemperature AS avg_temp,
               AverageTemperatureUncertainty AS avg_temp_uncertainty,
               LAT_AS_DOUBLE(Latitude) AS latitude,
               LONG_AS_DOUBLE(Longitude) AS longitude
        FROM staging_4
""")

In [57]:
temp_dim.limit(10).toPandas()

Unnamed: 0,city_code,date,year,month,avg_temp,avg_temp_uncertainty,latitude,longitude
0,ANC,1900-01-01,1900,1,-19.308,1.355,61.88,-151.13
1,ANC,1900-02-01,1900,2,-10.759,0.916,61.88,-151.13
2,ANC,1900-03-01,1900,3,-6.278,0.871,61.88,-151.13
3,ANC,1900-04-01,1900,4,-2.922,0.919,61.88,-151.13
4,ANC,1900-05-01,1900,5,3.9090000000000007,1.289,61.88,-151.13
5,ANC,1900-06-01,1900,6,9.354,0.483,61.88,-151.13
6,ANC,1900-07-01,1900,7,12.056,0.583,61.88,-151.13
7,ANC,1900-08-01,1900,8,9.473,1.012,61.88,-151.13
8,ANC,1900-09-01,1900,9,4.884,0.742,61.88,-151.13
9,ANC,1900-10-01,1900,10,-4.16,0.7,61.88,-151.13


### Save Temperature Dimension Table partitioned by year, month

In [60]:
dim_temp_path = analytics_dir + 'temperature/'
temp_dim.write.partitionBy('year', 'month').parquet(dim_temp_path)

### Airports Dimension Table

In [6]:
airports = spark.read.csv(airports_path, header=True)
airports.createOrReplaceTempView('staging_5')

In [7]:
dim_airports = spark.sql("""
        SELECT airport_id,
               city_code,
               city,
               state_code,
               type,
               name,
               elevation_ft,
               gps_code,
               iata_code,
               local_code,
               ROUND(latitude, 2) AS latitude,
               ROUND(longitude, 2) AS longitude
        FROM staging_5
""")

In [8]:
dim_airports.limit(10).toPandas()

Unnamed: 0,airport_id,city_code,city,state_code,type,name,elevation_ft,gps_code,iata_code,local_code,latitude,longitude
0,PANC,ANC,anchorage,AK,large_airport,Ted Stevens Anchorage International Airport,152.0,PANC,ANC,ANC,61.17,-150.0
1,PAFA,FRB,fairbanks,AK,large_airport,Fairbanks International Airport,439.0,PAFA,FAI,FAI,64.82,-147.86
2,PAJN,JUN,juneau,AK,medium_airport,Juneau International Airport,21.0,PAJN,JNU,JNU,58.35,-134.58
3,PAKT,5KE,ketchikan,AK,medium_airport,Ketchikan International Airport,89.0,PAKT,KTN,KTN,55.36,-131.71
4,PAKT,KET,ketchikan,AK,medium_airport,Ketchikan International Airport,89.0,PAKT,KTN,KTN,55.36,-131.71
5,KOLS,NOG,nogales,AZ,medium_airport,Nogales International Airport,3955.0,KOLS,OLS,OLS,31.42,-110.85
6,KPHX,PHO,phoenix,AZ,large_airport,Phoenix Sky Harbor International Airport,1135.0,KPHX,PHX,PHX,33.43,-112.01
7,KTUS,TUC,tucson,AZ,large_airport,Tucson International Airport,2643.0,KTUS,TUS,TUS,32.12,-110.94
8,KNYL,YUI,yuma,AZ,medium_airport,Yuma MCAS/Yuma International Airport,213.0,KNYL,YUM,NYL,32.66,-114.61
9,KNYL,YUM,yuma,AZ,medium_airport,Yuma MCAS/Yuma International Airport,213.0,KNYL,YUM,NYL,32.66,-114.61


### Save Airports Dimension Table

In [10]:
dim_airports_path = analytics_dir + 'airports/'
airports.write.parquet(dim_airports_path)

### Time Dimesion Table

In [13]:
staging_1 = spark.read.parquet(staging_1_path)
staging_1.createOrReplaceTempView('staging_1')

In [17]:
ts = spark.sql("SELECT DISTINCT SAS_TO_DT(arrdate) AS date FROM staging_1")

In [18]:
ts.limit(10).toPandas()

Unnamed: 0,date
0,2016-04-22
1,2016-04-15
2,2016-04-18
3,2016-04-09
4,2016-04-11
5,2016-04-12
6,2016-04-27
7,2016-04-01
8,2016-04-08
9,2016-04-26


In [19]:
ts.createOrReplaceTempView('TS')

In [20]:
dim_time = spark.sql("""
        SELECT date AS arrival_date,
               YEAR(date) AS year,
               MONTH(date) AS month,
               DAY(date) AS day,
               WEEKOFYEAR(date) AS week,
               DAYOFWEEK(date) AS weekday
        FROM TS
""")

In [21]:
dim_time.limit(10).toPandas()

Unnamed: 0,arrival_date,year,month,day,week,weekday
0,2016-04-22,2016,4,22,16,6
1,2016-04-15,2016,4,15,15,6
2,2016-04-18,2016,4,18,16,2
3,2016-04-09,2016,4,9,14,7
4,2016-04-11,2016,4,11,15,2
5,2016-04-12,2016,4,12,15,3
6,2016-04-27,2016,4,27,17,4
7,2016-04-01,2016,4,1,13,6
8,2016-04-08,2016,4,8,14,6
9,2016-04-26,2016,4,26,17,3


### Save Time Dimension Table partitioned by year, month

In [23]:
dim_time_path = analytics_dir + 'time/'
dim_time.write.partitionBy('year', 'month').parquet(dim_time_path)