## Airport Pagerank

In [None]:
# Load Libraries
from pyspark.sql.functions import col
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
import graphframes as GF
from pyspark.sql.functions import lit
from pyspark.sql.functions import to_timestamp

from timezonefinder import TimezoneFinder
import re
import os
import numpy as np
import pandas as pd
import math

In [None]:
# Parameters for SAS Token. Required for establishing connection to blob storage.

blob_container = "cont" # The name of your container created in https://portal.azure.com
storage_account = "acc" # The name of your Storage account created in https://portal.azure.com
secret_scope = "scope" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "key" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/path"

### SAS Token

In [None]:
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [None]:
# display(dbutils.fs.ls(f"{blob_url}/weather_data_1d/"))
display(dbutils.fs.ls(f"{blob_url}"))

path,name,size
wasbs://w261-g29@chiebel.blob.core.windows.net/2015_airport_pagerank/,2015_airport_pagerank/,0
wasbs://w261-g29@chiebel.blob.core.windows.net/2016_airport_pagerank/,2016_airport_pagerank/,0
wasbs://w261-g29@chiebel.blob.core.windows.net/2017_airport_pagerank/,2017_airport_pagerank/,0
wasbs://w261-g29@chiebel.blob.core.windows.net/2018_airport_pagerank/,2018_airport_pagerank/,0
wasbs://w261-g29@chiebel.blob.core.windows.net/aiport_codes/,aiport_codes/,0
wasbs://w261-g29@chiebel.blob.core.windows.net/airline_test/,airline_test/,0
wasbs://w261-g29@chiebel.blob.core.windows.net/airlines_df/,airlines_df/,0
wasbs://w261-g29@chiebel.blob.core.windows.net/airlines_df_large/,airlines_df_large/,0
wasbs://w261-g29@chiebel.blob.core.windows.net/airport_list_orig_dest/,airport_list_orig_dest/,0
wasbs://w261-g29@chiebel.blob.core.windows.net/airports_orig_dest/,airports_orig_dest/,0


In [None]:
# Inspect the Mount's Final Project folder 
display(dbutils.fs.ls("/mnt/mids-w261/datasets_final_project/parquet_airlines_data/"))

path,name,size
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2015.parquet/,2015.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2016.parquet/,2016.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2017.parquet/,2017.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2018.parquet/,2018.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/2019.parquet/,2019.parquet/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/airlines_size_test.parquet/,airlines_size_test.parquet/,0


### Load Smaller Airline Data

In [None]:
df_src_dest = spark.read.parquet(f"{blob_url}/airlines_df/")

In [None]:
display(df_src_dest.limit(2))

YEAR,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,OP_CARRIER_AIRLINE_ID,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,ORIGIN_STATE_FIPS,DEST_AIRPORT_ID,DEST_CITY_MARKET_ID,DEST,DEST_STATE_FIPS,CRS_DEP_TIME,DEP_DEL15,DEP_TIME_BLK,CRS_ARR_TIME,ARR_TIME_BLK,DISTANCE,DISTANCE_GROUP,BAD_TRAVEL_DAY,DEP_TIMESTAMP,DEP_TIMESTAMP_UTC,DEP_TIMESTAMP_UTC_MINUS2,WX_STATION_ORIG,WX_STATION_DEST
2015,2,5,9,6,2015-05-09,B6,20409,1563,13930,30977,ORD,17,14843,34819,SJU,72,851,0.0,0800-0859,1427,1400-1459,2072.0,9,False,2015-05-09T08:51:00.000+0000,2015-05-09T13:51:00.000+0000,2015-05-09T11:51:00.000+0000,KORD,TJSJ
2015,2,5,9,6,2015-05-09,OO,20304,2827,13930,30977,ORD,17,13342,33342,MKE,55,855,1.0,0800-0859,951,0900-0959,67.0,1,False,2015-05-09T08:55:00.000+0000,2015-05-09T13:55:00.000+0000,2015-05-09T11:55:00.000+0000,KORD,KMKE


### A function to pick source and destination airports and create the adjacency list

In [None]:
def createAdjListRDD(df_f_src_dest):
    '''
    A function to create edges rdd based on origin and destination
    input: source dataframe with airport origin and destination
    output: airport edges in rdd format 
    '''
    airport_f_edges = (df_f_src_dest
                 .select(
                    F.col('ORIGIN').alias('src'),
                    F.col('DEST').alias('dst')
                ))

    airport_f_edges = airport_f_edges.drop_duplicates()
    airport_f_edges.count()
    
    df1_f = airport_f_edges.select(['src'])
    df2_f = airport_f_edges.select(['dst']).withColumnRenamed('dst', 'src')

    airport_f_nodes = df1_f.union(df2_f).drop_duplicates()
    airport_f_nodes.count()
    
    temp_f_air_edge = airport_f_nodes.select(['src']).withColumn('dst', lit(None))
    temp_f_air_edge.count()
    
    edge_f_df = airport_f_edges.union(temp_f_air_edge)
    edge_f_df.count()
    
    edges_f_adj_list = edge_f_df.groupBy('src').agg(F.collect_list('dst'))
    
    edges_f_rdd = edges_f_adj_list.rdd
    
    return edges_f_rdd

In [None]:
# job to initialize the graph
def initGraph(dataRDD):
          
    # total node count
    totalCount = dataRDD.count()
            
    graphRDD = dataRDD.map(lambda x: (x[0],((1/totalCount),x[1])))
            
    return graphRDD

In [None]:
# FloatAccumulatorParam class
from pyspark.accumulators import AccumulatorParam

class FloatAccumulatorParam(AccumulatorParam):
    
    def zero(self, value):
        return value
    
    def addInPlace(self, val1, val2):
        return val1 + val2

In [None]:

def runPageRank(graphInitRDD, alpha = 0.15, maxIter = 10, verbose = True):
    
    # teleportation:
    a = sc.broadcast(alpha)
    
    # damping factor:
    d = sc.broadcast(1-a.value)
    
    # initialize accumulators for dangling mass & total mass
    mmAccum = sc.accumulator(0.0, FloatAccumulatorParam())
    totAccum = sc.accumulator(0.0, FloatAccumulatorParam())
        
    
    n = graphInitRDD.count()
    sc.broadcast(n)
    
    def initDistribute(line):
        # helper function to do initial page rank distribution to nodes
        # yield (key, (1-α)*pageRank)
        initPRank = line[1][0]
        outNodes = list(line[1][1])
        curNode = line[0]
        totalOutlinks = len(line[1][1])
        adiList = line[1][1]                
        
        # If outbound nodes exist yield
        if outNodes:
            for i in outNodes:
                yield (i,(d.value*initPRank*1/totalOutlinks))
                
        else:
        # add dangling mass to the accumulator        
            mmAccum.add(initPRank)
          
        # yield the dangling node  
        yield (curNode,(0))
            
    def dmassDistribute(line):
        # helper function to distribute dangling mass and add teleportation factor
        # yield (key, (((α)*1/|G| + (1-α)*m/|G|)        
        yield (line[0],((a.value/n)+(d.value*mm/n))) 
           
        
    # create the adjuscency list RDD
    adjRDD = graphInitRDD.map(lambda x: (x[0], x[1][1])).cache()   
    
    # Initialize RDD to be processesed for each iteration
    intStateRDD = graphInitRDD
    
    # Iterate and update intStateRDD after each iteration
    
    for i in range(maxIter):
        
        # Create the first RDD with the initial page rank distribution
        firstStateRDD = intStateRDD.flatMap(initDistribute).reduceByKey(lambda x,y: x+y)
        # Make an action for evaluation of firstStateRDD (to add dangling mass to accumulator)
        FRDD = firstStateRDD.take(1)
    
        # Broadcast dangling mass
        mm = float(mmAccum.value)
        sc.broadcast(mm)
        
        # Create a second RDD with the distributed dangling mass and teleportation
        secondStateRDD = adjRDD.flatMap(dmassDistribute)
        
        # Reset dangling mass accumulator
        mmAccum.value = 0
        
        # Join distributed page rank and dangling mass RDDs and reduce by key 
        steadyStateRDD = sc.union([firstStateRDD,secondStateRDD]).reduceByKey(lambda x,y: x+y)
        #Make an action for evaluation of steadyStateRDD  to be used in the next join
        SRDD = steadyStateRDD.take(1)       
     
        
        # Update the intStateRDD by joining current page ranks with the adjascency list RDD
        intStateRDD = steadyStateRDD.join(adjRDD)
            

    return steadyStateRDD

In [None]:
edges_rdd = createAdjListRDD(df_src_dest)

In [None]:
nIter = 10
testGraph = initGraph(edges_rdd).cache()
test_results = runPageRank(testGraph, alpha = 0.15, maxIter = nIter, verbose = False) \
                .toDF().withColumnRenamed('_1', 'code').withColumnRenamed('_2', 'rank')

# Write file to cloud storage
test_results.write.mode('overwrite').parquet(f'{blob_url}/small_airport_set_pagerank')


In [None]:
# Read joined data from cloud storage
test_pagerank = spark.read.parquet(f'{blob_url}/small_airport_set_pagerank')

In [None]:
top_2 = test_pagerank.sort('rank',ascending=False)
display(top_2.limit(2))

code,rank
PVD,0.0051106766307268
MBS,0.0051106766307268


## Load full airline data and create page rank

In [None]:
#Load data by year
df_f_src_dest_2015 = spark.read.parquet('dbfs:/mnt/parquet_airlines_data/2015.parquet/')
df_f_src_dest_2016 = spark.read.parquet('dbfs:/mnt/parquet_airlines_data/2016.parquet/')
df_f_src_dest_2017 = spark.read.parquet('dbfs:/mnt/parquet_airlines_data/2017.parquet/')
df_f_src_dest_2018 = spark.read.parquet('dbfs:/mnt/parquet_airlines_data/2018.parquet/')
#For all years (2015-2018) together
df_f_src_dest = spark.read.parquet(f'{blob_url}/joined_data_traintest')

In [None]:
df_f_src_dest.count()

In [None]:
display(df_f_src_dest.limit(2))

YEAR,QUARTER,MONTH,DAY_OF_MONTH,DAY_OF_WEEK,FL_DATE,OP_UNIQUE_CARRIER,OP_CARRIER_AIRLINE_ID,OP_CARRIER_FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,ORIGIN_STATE_FIPS,DEST_AIRPORT_ID,DEST_CITY_MARKET_ID,DEST,DEST_STATE_FIPS,CRS_DEP_TIME,DEP_DEL15,DEP_TIME_BLK,CRS_ARR_TIME,ARR_TIME_BLK,DISTANCE,DISTANCE_GROUP,BAD_TRAVEL_DAY,DEP_TIMESTAMP,DEP_TIMESTAMP_UTC,DEP_TIMESTAMP_UTC_MINUS2,WX_STATION_ORIG,WX_STATION_DEST,CALL_SIGN_ORG,WX_TIMESTAMP_UTC_START_ORG,WX_TIMESTAMP_UTC_END_ORG,ELEVATION_ORG,SLP_ORG,VIS_ORG,TMP_ORG,DEW_ORG,CIG_ORG,WND_ORG,GUST_ORG,TCB_ORG,OBSC_ORG,PREC_ORG,SNW_ORG,FZFG_ORG,FZDZ_ORG,FZRA_ORG,TSRA_ORG,HSN_ORG,LSN_ORG,BSN_ORG,VV_ORG,TS_ORG,HAIL_ORG,FOG_ORG,VA_ORG,BR_ORG,SQ_ORG,FC_ORG,CALL_SIGN_DEST,WX_TIMESTAMP_UTC_START_DEST,WX_TIMESTAMP_UTC_END_DEST,ELEVATION_DEST,SLP_DEST,VIS_DEST,TMP_DEST,DEW_DEST,CIG_DEST,WND_DEST,GUST_DEST,TCB_DEST,OBSC_DEST,PREC_DEST,SNW_DEST,FZFG_DEST,FZDZ_DEST,FZRA_DEST,TSRA_DEST,HSN_DEST,LSN_DEST,BSN_DEST,VV_DEST,TS_DEST,HAIL_DEST,FOG_DEST,VA_DEST,BR_DEST,SQ_DEST,FC_DEST
2015,1,3,4,3,2015-03-04,EV,20366,5446,11433,31295,DTW,26,10434,30434,AVP,42,1950,0.0,1900-1959,2117,2100-2159,399.0,2,0,2015-03-04T19:50:00.000+0000,2015-03-05T00:50:00.000+0000,2015-03-04T22:50:00.000+0000,KDTW,KAVP,KDTW,2015-03-04T21:53:00.000+0000,2015-03-04T22:52:59.000+0000,192.3,1016.0,10.0,-0.6,-12.2,22001,15.0,22.0,0,0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KAVP,2015-03-04T22:39:00.000+0000,2015-03-04T22:53:59.000+0000,283.5,1011.0,7.0,3.9,0.6,5000,6.0,0.0,0,0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2015,1,3,4,3,2015-03-04,EV,20366,4922,10397,30397,ATL,13,10434,30434,AVP,42,2100,0.0,2100-2159,2300,2300-2359,714.0,3,0,2015-03-04T21:00:00.000+0000,2015-03-05T02:00:00.000+0000,2015-03-05T00:00:00.000+0000,KATL,KAVP,KATL,2015-03-04T23:52:00.000+0000,2015-03-05T00:51:59.000+0000,307.8,1013.0,10.0,21.7,15.0,25000,11.1,0.0,0,0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,KAVP,2015-03-04T23:54:00.000+0000,2015-03-05T00:53:59.000+0000,283.5,1011.0,8.0,3.9,0.0,5000,6.0,0.0,0,0,0.0,0.0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [None]:
#Create airport adjacency list by year
edges_f_rdd_2015 = createAdjListRDD(df_f_src_dest_2015)
edges_f_rdd_2016 = createAdjListRDD(df_f_src_dest_2016)
edges_f_rdd_2017 = createAdjListRDD(df_f_src_dest_2016)
edges_f_rdd_2018 = createAdjListRDD(df_f_src_dest_2017)

In [None]:
#For each year create seperate pagerank and write to blob storage
nIter = 30
year = 2015
for edges_f_rdd in [edges_f_rdd_2015, edges_f_rdd_2016, edges_f_rdd_2017, edges_f_rdd_2018]:
    
    fullGraph = initGraph(edges_f_rdd).cache()
    full_results = runPageRank(fullGraph, alpha = 0.15, maxIter = nIter, verbose = False) \
                .toDF().withColumnRenamed('_1', 'code').withColumnRenamed('_2', 'rank')

    # Write file to cloud storage
    full_results.write.mode('overwrite').parquet(f'{blob_url}/{year}_airport_pagerank')
    year += 1

In [None]:
# Add the year start and end columns for the airport yearly pagerank and union the datasets
for year in [2015, 2016, 2017, 2018]:
    globals()[f'page_{year}'] = spark.read.parquet(f'{blob_url}/{year}_airport_pagerank')
    #display(globals()[f'page_{year}'].sort('rank',ascending=False))
    globals()[f'page_{year}'] = globals()[f'page_{year}'].select(['code','rank']) \
                                                        .withColumn('year_start', to_timestamp(lit(f'01-01-{year} 00:00:00.000'),'MM-dd-yyyy HH:mm:ss.SSS')) \
                                                        .withColumn('year_end', to_timestamp(lit(f'12-31-{year} 23:59:59.999'),'MM-dd-yyyy HH:mm:ss.SSS'))

yearly_page_rank = page_2015.union(page_2016).union(page_2017).union(page_2018)
yearly_page_rank.write.mode('overwrite').parquet(f'{blob_url}/yearly_airport_pagerank')
display(yearly_page_rank.where(col('code') == 'JAX'))

code,rank,year_start,year_end
JAX,0.0039124900646437,2015-01-01T00:00:00.000+0000,2015-12-31T23:59:59.999+0000
JAX,0.003733689645776,2016-01-01T00:00:00.000+0000,2016-12-31T23:59:59.999+0000
JAX,0.003733689645776,2017-01-01T00:00:00.000+0000,2017-12-31T23:59:59.999+0000
JAX,0.003748472315587,2018-01-01T00:00:00.000+0000,2018-12-31T23:59:59.999+0000


In [None]:
all_rank = spark.read.parquet(f'{blob_url}/yearly_airport_pagerank')
display(all_rank.where(col('code') == 'ATL'))

code,rank,year_start,year_end
ATL,0.0390555302577556,2017-01-01T00:00:00.000+0000,2017-12-31T23:59:59.999+0000
ATL,0.0376777266400135,2015-01-01T00:00:00.000+0000,2015-12-31T23:59:59.999+0000
ATL,0.0380156950671441,2018-01-01T00:00:00.000+0000,2018-12-31T23:59:59.999+0000
ATL,0.0390555302577556,2016-01-01T00:00:00.000+0000,2016-12-31T23:59:59.999+0000


In [None]:
pagerank_2018 = spark.read.parquet(f'{blob_url}/2018_airport_pagerank')
top_10_2018 = pagerank_2018.sort('rank',ascending=False)
display(top_10_2018.limit(10))

code,rank
ATL,0.0380156950671441
ORD,0.0361785460515363
DFW,0.0316764291893346
DEN,0.0275426323868994
MSP,0.0259370467845489
DTW,0.0229500515736258
IAH,0.0214199307454624
SLC,0.0186900108553063
PHX,0.0177667320567175
SFO,0.0166404582328289
