# Airport Page Rank

#### Load Data

In [0]:
%run /Users/rubyhan@berkeley.edu/team28/Final_Project/Imports

In [0]:
# set main_data
main_data = airline_2015_processed_df.filter(f.col('MONTH') < 8).cache()

#### Investigate Blob Azure Storage

In [0]:
display(dbutils.fs.ls(blob_url))

path,name,size
wasbs://w261-team28-container@team28.blob.core.windows.net/30k_weather_sample_processed/,30k_weather_sample_processed/,0
wasbs://w261-team28-container@team28.blob.core.windows.net/3m_flights_processed/,3m_flights_processed/,0
wasbs://w261-team28-container@team28.blob.core.windows.net/6m_flights_processed/,6m_flights_processed/,0
wasbs://w261-team28-container@team28.blob.core.windows.net/ICAO_station_mapping/,ICAO_station_mapping/,0
wasbs://w261-team28-container@team28.blob.core.windows.net/airline_2019_data-processed/,airline_2019_data-processed/,0
wasbs://w261-team28-container@team28.blob.core.windows.net/data_for_aditya/,data_for_aditya/,0
wasbs://w261-team28-container@team28.blob.core.windows.net/full_airline_data-processed/,full_airline_data-processed/,0
wasbs://w261-team28-container@team28.blob.core.windows.net/graph_test/,graph_test/,0
wasbs://w261-team28-container@team28.blob.core.windows.net/processed/,processed/,0
wasbs://w261-team28-container@team28.blob.core.windows.net/weather_2015_02_21/,weather_2015_02_21/,0


#### Data Size

In [0]:
def file_ls(path: str):
    '''List all files in base path recursively.'''
    tot = 0
    for x in dbutils.fs.ls(path):
        if x.path[-1] != '/':
            tot += x.size
            yield x
        else:
            for y in file_ls(x.path):
                yield y
    yield f'DATASIZE: {tot}'

total_size = []
for i in file_ls(AIRLINE_2019_PROCESSED_PATH):
    if 'DATASIZE:' in i:
        total_size.append(int(i.split(' ')[1]))

print(f'Total Data Size: {sum(total_size)/1e9:.2f} GB')
# print(f'Total Number of Records: {main_data.count():,}')

In [0]:
main_data.printSchema()

#### Create GraphFrame + PageRank

In [0]:
ORIGIN = main_data.select('ORIGIN', 'ORIGIN_CITY_NAME', 'AIRPORT_LAT_ORIGIN', 'AIRPORT_LONG_ORIGIN').distinct()
DEST = main_data.select('DEST', 'DEST_CITY_NAME', 'AIRPORT_LAT_DEST', 'AIRPORT_LONG_DEST').distinct()

AIRPORT = ORIGIN.union(DEST).distinct()
AIRPORT = AIRPORT.withColumnRenamed('ORIGIN', 'id')\
                 .withColumnRenamed('ORIGIN_CITY_NAME', 'name')

AIRPORT_EDGES = (
    main_data.select(
        f.col('ORIGIN').alias('src'),
        f.col('DEST').alias('dst'),
        'OP_UNIQUE_CARRIER','MONTH','QUARTER','YEAR','ORIGIN_CITY_NAME','DEST_CITY_NAME','DISTANCE',
        'AIRPORT_LAT_ORIGIN', 'AIRPORT_LONG_ORIGIN', 'AIRPORT_LAT_DEST', 'AIRPORT_LONG_DEST',
        f.format_string('%d-%02d',f.col('YEAR'),f.col('MONTH')).alias('YEAR-MONTH')        
    )
).cache()

airport_graph = GraphFrame(AIRPORT, AIRPORT_EDGES)
airport_rank = airport_graph.pageRank(resetProbability=0.15, maxIter=5).cache()

In [0]:
spark_df = airport_rank.vertices.orderBy("pagerank", ascending=False)
joined_full_null_imputed_df_pagerank = joined_full_null_imputed_df.join(
    spark_df.select('id', 'pagerank'), 
    joined_full_null_imputed_df.ORIGIN == spark_df.id,
    "left").drop('id').cache()
joined_full_null_imputed_df_pagerank = joined_full_null_imputed_df_pagerank.withColumn(
    'pagerank', f.when(f.col('pagerank').isNull(), 0).otherwise(f.col('pagerank'))
)

JOINED_FULL_NULL_IMPUTED_PAGERANK_PROCESSED_PATH = blob_url + '/processed/joined_full_null_imputed_processed_pagerank_df.parquet'
 
joined_full_null_imputed_df_pagerank.write.mode('overwrite').parquet(JOINED_FULL_NULL_IMPUTED_PAGERANK_PROCESSED_PATH)

In [0]:
# top 20 busiest airport hubs
airport_rank_pd_df = airport_rank.vertices.orderBy("pagerank", ascending=False).toPandas()
(airport_rank.vertices.orderBy("pagerank", ascending=False)).limit(20).toPandas()

Unnamed: 0,id,name,AIRPORT_LAT_ORIGIN,AIRPORT_LONG_ORIGIN,pagerank
0,ATL,"Atlanta, GA",33.6367,-84.428101,20.173987
1,ORD,"Chicago, IL",41.9786,-87.9048,17.066812
2,DFW,"Dallas/Fort Worth, TX",32.896801,-97.038002,14.978179
3,DEN,"Denver, CO",39.861698,-104.672997,11.413825
4,LAX,"Los Angeles, CA",33.942501,-118.407997,9.212781
5,IAH,"Houston, TX",29.9844,-95.3414,8.277812
6,SFO,"San Francisco, CA",37.618999,-122.375,7.572132
7,PHX,"Phoenix, AZ",33.434299,-112.012001,7.328128
8,MSP,"Minneapolis, MN",44.882,-93.221802,7.097367
9,SLC,"Salt Lake City, UT",40.788399,-111.977997,6.856158


In [0]:
# normalize page rank
airport_rank_pd_df['norm_pageRank'] = airport_rank_pd_df['pagerank']/airport_rank_pd_df['pagerank'].max()
airport_rank_pd_df['norm_pageRank'] = airport_rank_pd_df['norm_pageRank'].round(2)

# label
airport_rank_pd_df['label'] = 'IATA: ' + airport_rank_pd_df['id'] + ', PageRank: ' + airport_rank_pd_df['norm_pageRank'].astype('str')

# plot geomap
fig = go.Figure(
    data=go.Scattergeo(
        
        locationmode = 'USA-states',
        lat = airport_rank_pd_df['AIRPORT_LAT_ORIGIN'],
        lon = airport_rank_pd_df['AIRPORT_LONG_ORIGIN'],
        text = airport_rank_pd_df['label'],
        mode = 'markers',
        marker = dict(size = airport_rank_pd_df['pagerank']*2,
                      color = airport_rank_pd_df['pagerank'],
                      colorbar_title = 'Rank')
    )
)

fig.update_layout(
        title = 'Y2015 M1-7 Airport PageRank',
        geo = dict(projection_type ='albers usa'),
    )
fig.show()