# Exploration and Graphs

#### Team 14: Carlos Moreno, Elizabeth Khan, Jagan Lakshmipathy, and Ziling Huang


__Summary:__
A flight network can be represented by a Power-Law Distribution with uneven distributions of nodes and relationships. Most airports have few relationships but some airports have a lot which creates hub-and-spoke structures. Using a traditional analytics approach may obscure patterns due to the skew and hence graph analytics are used to keep the focus on relationships. 

Through Centrality Algorithms like Pagerank we can understand which airport nodes are most important in the flight traffic network.
In Community Detection Algorithms, we find airport communities and uncover hubs to study cascading effects of delays and weather.

##Setup and import libraries

In [0]:
#https://www.analyticsvidhya.com/blog/2021/08/best-practices-and-performance-tuning-activities-for-pyspark/
#Create spark session with required configuration
 
from pyspark.sql import SparkSession,SQLContext
sql_jar="/path/to/sql_jar_file/sqljdbc42.jar"
spark_snow_jar="/usr/.../snowflake/spark-snowflake_2.11-2.5.5-spark_2.3.jar"
snow_jdbc_jar="/usr/.../snowflake/snowflake-jdbc-3.10.3.jar"
oracle_jar="/usr/path/to/oracle_jar_file//v12/jdbc/lib/oracle6.jar"
spark=(SparkSession
.builder
.master('yarn')
.appName('Spark job new_job')
.config('spark.driver.memory','10g')
.config('spark.submit.deployMode','client')
.config('spark.executor.memory','15g')
.config('spark.executor.cores',4)
.config('spark.yarn.queue','short')
.config('spark.jars','{},{},{},{}'.format(sql_jar,spark_snow_jar,snow_jdbc_jar,oracle_jar))
.enableHiveSupport()
.getOrCreate())

In [0]:
from pyspark.sql.functions import *
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType,BooleanType,DateType,DoubleType,FloatType
import networkx as nx

from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.rdd import portable_hash
from pyspark.sql.window import Window
from pyspark.sql.functions import to_timestamp, to_date
from pyspark.sql.functions import substring

import pandas as pd
import numpy as np
import math as math
import time
import datetime
import matplotlib
import matplotlib.pyplot as plt
from pylab import rcParams
import matplotlib.ticker as mtick
import seaborn as sns
from graphframes import *
import geopandas as gpd
import plotly as plotly

from heatmap import heatmap, corrplot

pd.set_option("display.max_rows", 999)
pd.set_option("display.max_columns", 200)


from pyspark.ml import *
from pyspark.ml.linalg import *
from pyspark.ml.stat import *
from pyspark.ml.feature import *
from pyspark.sql.window import *

import neo4j 
from neo4j import GraphDatabase

from bokeh.sampledata import us_states
from bokeh.plotting import *
import csv

from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F
from pyspark.sql.types import *
from operator import itemgetter

#Blob credentials
blob_container = "cemgr14c" # The name of your container created in https://portal.azure.com
storage_account = "cemgr14" #The name of your Storage account created in https://portal.azure.com
secret_scope = "w261gr14" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "keygr14" # The name of the secret key created in your local computer using the Databricks CLI
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
#pandas udf
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [0]:
# SAS Token

spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)


In [0]:
# Inspect the Mount's Final Project folder 
display(dbutils.fs.ls(f"{mount_path}/datasets_final_project"))

path,name,size
dbfs:/mnt/mids-w261/datasets_final_project/airlines/,airlines/,0
dbfs:/mnt/mids-w261/datasets_final_project/airlines_data/,airlines_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/,parquet_airlines_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data_3m/,parquet_airlines_data_3m/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data_6m/,parquet_airlines_data_6m/,0
dbfs:/mnt/mids-w261/datasets_final_project/stations_data/,stations_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data/,weather_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data_6_hr/,weather_data_6_hr/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data_single/,weather_data_single/,0


In [0]:
# Inspect our Team's data blob 
display(dbutils.fs.ls(f"{blob_url}"))

path,name,size
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_joins_sel/,airline_joins_sel/,0
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_joins_sel.delta/,airline_joins_sel.delta/,0
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_joins_sel_CEM/,airline_joins_sel_CEM/,0
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_pr_cem.delta/,airline_pr_cem.delta/,0
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_rank_CEM_CAR1.delta/,airline_rank_CEM_CAR1.delta/,0
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_test_CEM_CAR1.delta/,airline_test_CEM_CAR1.delta/,0
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_test_sel/,airline_test_sel/,0
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_test_sel.delta/,airline_test_sel.delta/,0
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_test_sel_CEM.delta/,airline_test_sel_CEM.delta/,0
wasbs://cemgr14c@cemgr14.blob.core.windows.net/airline_test_sel_CEM_CAR.delta/,airline_test_sel_CEM_CAR.delta/,0


##Read data

In [0]:
#read cleaned data with time of day, and other joined params that are useful for EDA/graphs
final_df_clean_join4  = spark.read.parquet(f"{blob_url}/eda_set_time_of_day_zl_0402") 
final_df_clean_join4  = final_df_clean_join4.cache()

In [0]:
final_df_clean_join5 = final_df_clean_join4.dropna()
final_df_clean_join4.unpersist()
final_df_clean_join5 = final_df_clean_join5.cache()

In [0]:
airportRankingFull = spark.read.parquet(f"{blob_url}/airportRankingFull")

In [0]:
display(airportRankingFull.limit(5))

airport,Connec_Ranking,N_Air_to,Num_connection,Delay_Ranking,Amt_Delay,station_id,air_lat,air_lon,st_lat,st_lon,dist_km,time_zone,rnk
15249,0.0009792927943900586,4,19343,0.0008548845294326957,204149.0,72214093805,30.396499633789062,-84.35030364990234,30.393,-84.353,0.4672385531319621,America/New_York,1
11097,0.0005767268232368813,2,3270,0.000534982115131496,35680.0,72670024045,44.520198822,-109.024002075,44.517,-109.017,0.6593297865406805,America/Denver,1
14877,0.0018938967099701,1,923,0.0017400841202419,23746.0,72458603919,38.79100036621094,-97.6521987915039,38.8,-97.65,1.018694889517569,America/Chicago,1
11721,0.0009326734413421368,5,16327,0.0008596435037470942,183799.0,72637014826,42.96540069580078,-83.74359893798828,42.967,-83.749,0.4740886093131737,America/New_York,1
13029,0.0007967300988846184,3,11132,0.0007459403848987206,144174.0,72551014939,40.85100173950195,-96.75920104980467,40.851,-96.748,0.9421124991815832,America/Chicago,1


In [0]:
airport_select = airportRankingFull.select('airport','Delay_Ranking').withColumn('airport_id_int',col('airport').cast(IntegerType())).drop('airport')

In [0]:
cond12 = [(final_df_clean_join5.ORIGIN_AIRPORT_ID == airport_select.airport_id_int)]
final_df_clean_join5 = final_df_clean_join5.join(airport_select, cond12, 'leftouter')

In [0]:
#Percent Rank
 
final_df_clean_join6 = final_df_clean_join5.select("ORIGIN_AIRPORT_ID","DEST_AIRPORT_ID","Airport_Name_O","Airport_Name_D","Delay_Ranking", F.percent_rank().over(Window.partitionBy().orderBy(final_df_clean_join5['Delay_Ranking'])).alias("delay_percent_rank"))
final_df_clean_join6 = final_df_clean_join6.withColumn('relationship',lit('flying_to')).withColumnRenamed('ORIGIN_AIRPORT_ID','src').withColumnRenamed('DEST_AIRPORT_ID','dst')

In [0]:
final_df_clean_join6 = final_df_clean_join6.cache()

In [0]:
final_df_clean_join6 = final_df_clean_join6.withColumn('delay_percent_rank_float',col('delay_percent_rank').cast(FloatType()))


In [0]:

def delay_rank_buckets(perc_rank):
  label = None
  if perc_rank <= (1/3):
    label = 'low_delay'
  elif perc_rank <= (2/3) and perc_rank>(1/3):
    label ='moderately_delay'
  elif perc_rank<=1 and perc_rank>(2/3):
    label = 'high_delay'
  else:
    pass
  return label

extract_delay_rank_buckets_udf = udf(delay_rank_buckets)
final_df_clean_join6 = final_df_clean_join6.withColumn("delay_rank_buckets_col", extract_delay_rank_buckets_udf("delay_percent_rank_float"))

In [0]:
final_df_clean_join6.select("delay_rank_buckets_col").distinct().show()


## Graph Implementation and Experiments

## Community Detection Algorithms

In this section, we would like to experiment on using Strongly Connected Components and Label Propagation to find clusters of airports with high connectivity and traffic that may propagate delays or be exposed to similar weather patterns.

Label Propagation quickly infers groups based on node labels. 
Strongly Connected components identifies connected clusters.

The Label Propagation algorithm (LPA) is a fast algorithm for finding communities
in a graph. In LPA, nodes select their group based on their direct neighbors. This process is well suited to networks where groupings are less clear and weights can be used
to help a node determine which community to place itself within. It also lends itself
well to semisupervised learning because you can seed the process with preassigned,
indicative node labels.

The steps often used for the Label Propagation pull method are:
1. Every node is initialized with a unique label (an identifier), and, optionally pre‐
liminary “seed” labels can be used.
2. These labels propagate through the network.
3. At every propagation iteration, each node updates its label to match the one with
the maximum weight, which is calculated based on the weights of neighbor nodes
and their relationships. Ties are broken uniformly and randomly.
4. LPA reaches convergence when each node has the majority label of its neighbors

As labels propagate, densely connected groups of nodes quickly reach a consensus on
a unique label. At the end of the propagation, only a few labels will remain, and nodes
that have the same label belong to the same community.

In [0]:
v = final_df_clean_join6.select('src','Airport_Name_O','delay_percent_rank_float')
v = v.withColumnRenamed('src','id').withColumnRenamed('Airport_Name_O','name')
vertices = v

e = final_df_clean_join6.select('src','dst','delay_rank_buckets_col')
e = e.withColumnRenamed('delay_rank_buckets_col','relationship')
#e = final_df_clean_join6.select('src','dst','relationship')

edges = e



In [0]:
g = GraphFrame(vertices, edges)

In [0]:
result = g.labelPropagation(maxIter=5)
display(result)

In [0]:
result.write.parquet(f"{blob_url}/lpa_result")

###Strongly Connected Components

In [0]:
result2 = g.stronglyConnectedComponents(maxIter=10)
display(result2.select("id", "component"))

In [0]:
result2.write.parquet(f"{blob_url}/scc_result")

## Centrality Algorithms

Centrality algorithms help identify influential points in a flight network that impact the flow of transport Apply Closeness Centrality when you need to know which nodes disseminate things the fastest. Using weighted relationships can be especially helpful in evaluating interaction speeds in communication and behavioral analyses. Implement this with ApacheSpark and GraphFrames

The goal here is to estimate delays in flight departure to the target destination and to examine how delays propagate through paths and airport communities.
The expected result is to see a table with Col 1 :Airport names, Col 2: Connected Airport Names and closeness metric. If score is 1.0 then each directly connects to all nodes in their part of the graph. Even if an airport has few connections, if score is 1.0 it means the airport has close influence on those connected airports.

In [0]:
final_df_clean_join4  = spark.read.parquet(f"{blob_url}/eda_set_time_of_day_zl_0402") 
final_df_clean_join4 = final_df_clean_join4.cache()


In [0]:
v = final_df_clean_join4.select("Airport_Name_O","air_lat_O", "air_lon_O","Air_Page_Rank_traffic").withColumnRenamed("Airport_Name_O" ,"id").withColumnRenamed("air_lat_O" ,"latitude").withColumnRenamed("air_lon_O" ,"longitude").withColumnRenamed("Air_Page_Rank_traffic","traffic")
e = final_df_clean_join4.select("Airport_Name_O","Airport_Name_D","Air_Page_Rank_traffic","DISTANCE","trip_id").withColumnRenamed("Airport_Name_O","src").withColumnRenamed("Airport_Name_D","dst").withColumnRenamed("Air_Page_Rank_traffic","cost").withColumnRenamed("DISTANCE" ,"distance").withColumnRenamed("trip_id" ,"id")


In [0]:
#Pg104 

def collect_paths(paths):
  return F.collect_set(paths)

collect_paths_udf = F.udf(collect_paths, ArrayType(StringType()))

paths_type = ArrayType(StructType([StructField("id", StringType()), StructField("distance",FloatType())]))

def flatten(ids):
  flat_list = [item for sublist in ids for item in sublist]
  return list(dict(sorted(flat_list, key=itemgetter(0))).items())

flatten_udf = F.udf(flatten, paths_type)

def new_paths(paths, id):
  paths = [{"id": col1, "distance": col2 + 1} for col1,col2 in paths if col1 != id]
  paths.append({"id": id, "distance": 1})
  return paths

new_paths_udf = F.udf(new_paths, paths_type)

def merge_paths(ids, new_ids, id):
  joined_ids = ids + (new_ids if new_ids else [])
  merged_ids = [(col1, col2) for col1, col2 in joined_ids if col1 != id]
  best_ids = dict(sorted(merged_ids, key=itemgetter(1), reverse=True))
  return [{"id": col1, "distance": col2} for col1, col2 in best_ids.items()]

merge_paths_udf = F.udf(merge_paths, paths_type)

def calculate_closeness(ids):
  nodes = len(ids)
  total_distance = sum([col2 for col1, col2 in ids])
  return 0 if total_distance == 0 else nodes * 1.0 / total_distance

closeness_udf = F.udf(calculate_closeness, DoubleType())

In [0]:
#This ran for 16 hours and ran into a timeout error

#Pg 105

vertices = final_df_clean_join4.select("Airport_Name_O","air_lat_O", "air_lon_O","Air_Page_Rank_traffic").withColumnRenamed("Airport_Name_O" ,"id").withColumnRenamed("air_lat_O" ,"latitude").withColumnRenamed("air_lon_O" ,"longitude").withColumnRenamed("Air_Page_Rank_traffic","traffic")
edges = final_df_clean_join4.select("Airport_Name_O","Airport_Name_D","Air_Page_Rank_traffic","DISTANCE","trip_id").withColumnRenamed("Airport_Name_O","src").withColumnRenamed("Airport_Name_D","dst").withColumnRenamed("Air_Page_Rank_traffic","cost").withColumnRenamed("DISTANCE" ,"distance")

g = GraphFrame(vertices,edges)
vertices = g.vertices.withColumn("ids", F.array())
cached_vertices = AM.getCachedDataFrame(vertices)
g2 = GraphFrame(cached_vertices, g.edges)



for i in range(0, g2.vertices.count()):
  msg_dst = new_paths_udf(AM.src["ids"], AM.src["id"])
  msg_src = new_paths_udf(AM.dst["ids"], AM.dst["id"])
  agg = g2.aggregateMessages(F.collect_set(AM.msg).alias("agg"),sendToSrc=msg_src, sendToDst=msg_dst)
  res = agg.withColumn("newIds", flatten_udf("agg")).drop("agg")
  new_vertices = (g2.vertices.join(res, on="id", how="left_outer").withColumn("mergedIds", merge_paths_udf("ids", "newIds","id")).drop("ids", "newIds").withColumnRenamed("mergedIds", "ids"))
  cached_new_vertices = AM.getCachedDataFrame(new_vertices)
  g2 = GraphFrame(cached_new_vertices, g2.edges)

  
(g2.vertices.withColumn("closeness", closeness_udf("ids")).sort("closeness", ascending=False).show(truncate=False))

### References:

- https://go.neo4j.com/rs/710-RRC-335/images/Neo4j_Graph_Algorithms.pdf, Pg 104-105, Pg 139, Pg 146

- https://graphframes.github.io/graphframes/docs/_site/user-guide.html

- https://docs.databricks.com/_static/notebooks/graphframes-user-guide-py.html