In [1]:
sc

In [2]:
sqlContext

<pyspark.sql.context.SQLContext at 0x107986ad0>

In [3]:
print(sc.version)

2.4.1


# On-Time Flight Performance with GraphFrames for Apache Spark

    0. Preparation
    1. Building the Graph
    2. Simple Queries
        2.1 Determine the number of airports and trips
        2.2 Determining the longest delay in this dataset
        2.3 Determining the number of delayed vs. on-time / early flights
        2.4 What flights departing SEA are most likely to have significant delays
        2.5 What destinations tend to have delays
        2.6 What destinations tend to have significant delays departing from SEA
    3. Vertex Degrees
    4. City / Flight Relationships through Motif Finding
        4.1 What delays might we blame on SFO
    5. Determining Airport Ranking using PageRank
    6. Most popular flights (single city hops)
    7. Top Transfer Cities
    8. Breadth First Search 
    9. Loading the D3 Visualization
    
    References: 
   * [GraphFrames Github](https://github.com/drabastomek/learningPySpark/blob/master/Chapter07/LearningPySpark_Chapter07.ipynb)

### 0. Preparation
Extract the Airports and Departure Delays information from S3 / DBFS

In [4]:
from IPython.display import display

In [5]:
# Set File Paths
# tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv"
# airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt"
tripdelaysFilePath = "/Users/jeanxu/Documents/UniLU/0_MasterThesis/6_Test/learningPySpark-master/Data/departuredelays.csv"
airportsnaFilePath = "/Users/jeanxu/Documents/UniLU/0_MasterThesis/6_Test/learningPySpark-master/Data/airport-codes-na.txt"


In [6]:

# Obtain airports dataset
airportsna = spark.read.csv(airportsnaFilePath, header='true', inferSchema='true', sep='\t')
airportsna.createOrReplaceTempView("airports_na")

# Obtain departure Delays data
departureDelays = spark.read.csv(tripdelaysFilePath, header='true')
departureDelays.createOrReplaceTempView("departureDelays")
departureDelays.cache()

# Available IATA codes from the departuredelays sample dataset
tripIATA = spark.sql("select distinct iata from (select distinct origin as iata from departureDelays union all select distinct destination as iata from departureDelays) a")
tripIATA.createOrReplaceTempView("tripIATA")

# Only include airports with atleast one trip from the departureDelays dataset
airports = spark.sql("select f.IATA, f.City, f.State, f.Country from airports_na f join tripIATA t on t.IATA = f.IATA")
airports.createOrReplaceTempView("airports")
airports.cache()

DataFrame[IATA: string, City: string, State: string, Country: string]

In [7]:
departureDelays.count()

1391578

In [8]:
# Build `departureDelays_geo` DataFrame
#  Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination)  
departureDelays_geo = spark.sql("select cast(f.date as int) as tripid, cast(concat(concat(concat(concat(concat(concat('2014-', concat(concat(substr(cast(f.date as string), 1, 2), '-')), substr(cast(f.date as string), 3, 2)), ' '), substr(cast(f.date as string), 5, 2)), ':'), substr(cast(f.date as string), 7, 2)), ':00') as timestamp) as `localdate`, cast(f.delay as int), cast(f.distance as int), f.origin as src, f.destination as dst, o.city as city_src, d.city as city_dst, o.state as state_src, d.state as state_dst from departuredelays f join airports o on o.iata = f.origin join airports d on d.iata = f.destination") 

# Create Temporary View and cache
departureDelays_geo.createOrReplaceTempView("departureDelays_geo")
departureDelays_geo.cache()

# Count
departureDelays_geo.count()

1361141

In [9]:
# Review the top 10 rows of the `departureDelays_geo` DataFrame
departureDelays_geo.show(10)

+-------+-------------------+-----+--------+---+---+-----------+-------------------+---------+---------+
| tripid|          localdate|delay|distance|src|dst|   city_src|           city_dst|state_src|state_dst|
+-------+-------------------+-----+--------+---+---+-----------+-------------------+---------+---------+
|1011111|2014-01-01 11:11:00|   -5|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1021111|2014-01-02 11:11:00|    7|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1031111|2014-01-03 11:11:00|    0|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1041925|2014-01-04 19:25:00|    0|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1061115|2014-01-06 11:15:00|   33|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1071115|2014-01-07 11:15:00|   23|     221|MSP|INL|Minneapolis|International Falls|       MN|       MN|
|1081115|2014-01-08 11:15:00|   -9|     221|MSP|INL|Min

In [10]:
# Using `display` to view the data
display(departureDelays_geo)

DataFrame[tripid: int, localdate: timestamp, delay: int, distance: int, src: string, dst: string, city_src: string, city_dst: string, state_src: string, state_dst: string]

## 1. Building the Graph
Now that we've imported our data, we're going to need to build our graph. To do so we're going to do two things: we are going to build the structure of the vertices (or nodes) and we're going to build the structure of the edges. What's awesome about GraphFrames is that this process is incredibly simple. 
* Rename IATA airport code to **id** in the Vertices Table
* Start and End airports to **src** and **dst** for the Edges Table (flights)

These are required naming conventions for vertices and edges in GraphFrames as of the time of this writing (Feb. 2016).

**WARNING:** If the graphframes package, required in the cell below, is not installed, follow the instructions [here](http://cdn2.hubspot.net/hubfs/438089/notebooks/help/Setup_graphframes_package.html). 

Here we use **pip install graphframes**

In [11]:
# Note, ensure you have already installed the GraphFrames spack-package
from pyspark.sql.functions import *
from graphframes import *


In [13]:

# Create Vertices (airports) and Edges (flights)
tripVertices = airports.withColumnRenamed("IATA", "id").distinct()
tripEdges = departureDelays_geo.select("tripid", "delay", "src", "dst", "city_dst", "state_dst")

# Cache Vertices and Edges
tripEdges.cache()
tripVertices.cache()

DataFrame[id: string, City: string, State: string, Country: string]

In [14]:
# Vertices
#   The vertices of our graph are the airports
display(tripVertices)

DataFrame[id: string, City: string, State: string, Country: string]

In [36]:
tripVertices.show()

+---+----------------+-----+-------+
| id|            City|State|Country|
+---+----------------+-----+-------+
|FAT|          Fresno|   CA|    USA|
|CMH|        Columbus|   OH|    USA|
|PHX|         Phoenix|   AZ|    USA|
|PAH|         Paducah|   KY|    USA|
|COS|Colorado Springs|   CO|    USA|
|MYR|    Myrtle Beach|   SC|    USA|
|RNO|            Reno|   NV|    USA|
|SRQ|        Sarasota|   FL|    USA|
|VLD|        Valdosta|   GA|    USA|
|PSC|           Pasco|   WA|    USA|
|BPT|        Beaumont|   TX|    USA|
|CAE|        Columbia|   SC|    USA|
|LAX|     Los Angeles|   CA|    USA|
|DAY|          Dayton|   OH|    USA|
|AVP|    Wilkes-Barre|   PA|    USA|
|MFR|         Medford|   OR|    USA|
|JFK|        New York|   NY|    USA|
|LAS|       Las Vegas|   NV|    USA|
|BNA|       Nashville|   TN|    USA|
|CLT|       Charlotte|   NC|    USA|
+---+----------------+-----+-------+
only showing top 20 rows



In [15]:
# Edges
#  The edges of our graph are the flights between airports
display(tripEdges)

DataFrame[tripid: int, delay: int, src: string, dst: string, city_dst: string, state_dst: string]

In [37]:
tripEdges.show()

+-------+-----+---+---+-------------------+---------+
| tripid|delay|src|dst|           city_dst|state_dst|
+-------+-----+---+---+-------------------+---------+
|1011111|   -5|MSP|INL|International Falls|       MN|
|1021111|    7|MSP|INL|International Falls|       MN|
|1031111|    0|MSP|INL|International Falls|       MN|
|1041925|    0|MSP|INL|International Falls|       MN|
|1061115|   33|MSP|INL|International Falls|       MN|
|1071115|   23|MSP|INL|International Falls|       MN|
|1081115|   -9|MSP|INL|International Falls|       MN|
|1091115|   11|MSP|INL|International Falls|       MN|
|1101115|   -3|MSP|INL|International Falls|       MN|
|1112015|   -7|MSP|INL|International Falls|       MN|
|1121925|   -5|MSP|INL|International Falls|       MN|
|1131115|   -3|MSP|INL|International Falls|       MN|
|1141115|   -6|MSP|INL|International Falls|       MN|
|1151115|   -7|MSP|INL|International Falls|       MN|
|1161115|   -3|MSP|INL|International Falls|       MN|
|1171115|    4|MSP|INL|Inter

In [16]:
# Build `tripGraph` GraphFrame
#  This GraphFrame builds up on the vertices and edges based on our trips (flights)
tripGraph = GraphFrame(tripVertices, tripEdges)


In [17]:
print tripGraph


GraphFrame(v:[id: string, City: string ... 2 more fields], e:[src: string, dst: string ... 4 more fields])


In [18]:

# Build `tripGraphPrime` GraphFrame
#   This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below)
tripEdgesPrime = departureDelays_geo.select("tripid", "delay", "src", "dst")
tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)

In [39]:
tripEdgesPrime.show()

+-------+-----+---+---+
| tripid|delay|src|dst|
+-------+-----+---+---+
|1011111|   -5|MSP|INL|
|1021111|    7|MSP|INL|
|1031111|    0|MSP|INL|
|1041925|    0|MSP|INL|
|1061115|   33|MSP|INL|
|1071115|   23|MSP|INL|
|1081115|   -9|MSP|INL|
|1091115|   11|MSP|INL|
|1101115|   -3|MSP|INL|
|1112015|   -7|MSP|INL|
|1121925|   -5|MSP|INL|
|1131115|   -3|MSP|INL|
|1141115|   -6|MSP|INL|
|1151115|   -7|MSP|INL|
|1161115|   -3|MSP|INL|
|1171115|    4|MSP|INL|
|1182015|   -5|MSP|INL|
|1191925|   -7|MSP|INL|
|1201115|   -6|MSP|INL|
|1211115|    0|MSP|INL|
+-------+-----+---+---+
only showing top 20 rows



In [41]:
tripGraphPrime.edges.show()

+-------+-----+---+---+
| tripid|delay|src|dst|
+-------+-----+---+---+
|1011111|   -5|MSP|INL|
|1021111|    7|MSP|INL|
|1031111|    0|MSP|INL|
|1041925|    0|MSP|INL|
|1061115|   33|MSP|INL|
|1071115|   23|MSP|INL|
|1081115|   -9|MSP|INL|
|1091115|   11|MSP|INL|
|1101115|   -3|MSP|INL|
|1112015|   -7|MSP|INL|
|1121925|   -5|MSP|INL|
|1131115|   -3|MSP|INL|
|1141115|   -6|MSP|INL|
|1151115|   -7|MSP|INL|
|1161115|   -3|MSP|INL|
|1171115|    4|MSP|INL|
|1182015|   -5|MSP|INL|
|1191925|   -7|MSP|INL|
|1201115|   -6|MSP|INL|
|1211115|    0|MSP|INL|
+-------+-----+---+---+
only showing top 20 rows



In [42]:
tripGraphPrime.vertices.show()

+---+----------------+-----+-------+
| id|            City|State|Country|
+---+----------------+-----+-------+
|FAT|          Fresno|   CA|    USA|
|CMH|        Columbus|   OH|    USA|
|PHX|         Phoenix|   AZ|    USA|
|PAH|         Paducah|   KY|    USA|
|COS|Colorado Springs|   CO|    USA|
|MYR|    Myrtle Beach|   SC|    USA|
|RNO|            Reno|   NV|    USA|
|SRQ|        Sarasota|   FL|    USA|
|VLD|        Valdosta|   GA|    USA|
|PSC|           Pasco|   WA|    USA|
|BPT|        Beaumont|   TX|    USA|
|CAE|        Columbia|   SC|    USA|
|LAX|     Los Angeles|   CA|    USA|
|DAY|          Dayton|   OH|    USA|
|AVP|    Wilkes-Barre|   PA|    USA|
|MFR|         Medford|   OR|    USA|
|JFK|        New York|   NY|    USA|
|LAS|       Las Vegas|   NV|    USA|
|BNA|       Nashville|   TN|    USA|
|CLT|       Charlotte|   NC|    USA|
+---+----------------+-----+-------+
only showing top 20 rows



In [23]:
tripGraph.vertices.show(10)

+---+----------------+-----+-------+
| id|            City|State|Country|
+---+----------------+-----+-------+
|FAT|          Fresno|   CA|    USA|
|CMH|        Columbus|   OH|    USA|
|PHX|         Phoenix|   AZ|    USA|
|PAH|         Paducah|   KY|    USA|
|COS|Colorado Springs|   CO|    USA|
|MYR|    Myrtle Beach|   SC|    USA|
|RNO|            Reno|   NV|    USA|
|SRQ|        Sarasota|   FL|    USA|
|VLD|        Valdosta|   GA|    USA|
|PSC|           Pasco|   WA|    USA|
+---+----------------+-----+-------+
only showing top 10 rows



In [25]:
tripGraph.edges.show(10)

+-------+-----+---+---+-------------------+---------+
| tripid|delay|src|dst|           city_dst|state_dst|
+-------+-----+---+---+-------------------+---------+
|1011111|   -5|MSP|INL|International Falls|       MN|
|1021111|    7|MSP|INL|International Falls|       MN|
|1031111|    0|MSP|INL|International Falls|       MN|
|1041925|    0|MSP|INL|International Falls|       MN|
|1061115|   33|MSP|INL|International Falls|       MN|
|1071115|   23|MSP|INL|International Falls|       MN|
|1081115|   -9|MSP|INL|International Falls|       MN|
|1091115|   11|MSP|INL|International Falls|       MN|
|1101115|   -3|MSP|INL|International Falls|       MN|
|1112015|   -7|MSP|INL|International Falls|       MN|
+-------+-----+---+---+-------------------+---------+
only showing top 10 rows



## 2. Simple Queries
Let's start with a set of simple graph queries to understand flight performance and departure delays

#### 2.1 Determine the number of airports and trips

In [22]:
print "Airports: %d" % tripGraph.vertices.count()
print "Trips: %d" % tripGraph.edges.count()

Airports: 279
Trips: 1361141


#### 2.2 Determining the longest delay in this dataset

In [26]:
tripGraph.edges.groupBy().max("delay").show()

+----------+
|max(delay)|
+----------+
|      1642|
+----------+



In [27]:
# Finding the longest Delay
longestDelay = tripGraph.edges.groupBy().max("delay")
display(longestDelay)

DataFrame[max(delay): int]

In [29]:
longestDelay.show()

+----------+
|max(delay)|
+----------+
|      1642|
+----------+



#### 2.3 Determining the number of delayed vs. on-time / early flights

In [31]:
# Determining number of on-time / early flights vs. delayed flights
print "On-time / Early Flights: %d" % tripGraph.edges.filter("delay <= 0").count()
print "Delayed Flights: %d" % tripGraph.edges.filter("delay > 0").count()

On-time / Early Flights: 780469
Delayed Flights: 580672


#### 2.4 What flights departing SEA are most likely to have significant delays
Note, delay can be <= 0 meaning the flight left on time or early

In [32]:
tripGraph.edges\
  .filter("src = 'SEA' and delay > 0")\
  .groupBy("src", "dst")\
  .avg("delay")\
  .sort(desc("avg(delay)"))\
  .show(5)
  

+---+---+------------------+
|src|dst|        avg(delay)|
+---+---+------------------+
|SEA|PHL|55.666666666666664|
|SEA|COS| 43.53846153846154|
|SEA|FAT| 43.03846153846154|
|SEA|LGB| 39.39705882352941|
|SEA|IAD|37.733333333333334|
+---+---+------------------+
only showing top 5 rows



In [33]:
display(tripGraph.edges.filter("src = 'SEA' and delay > 0").groupBy("src", "dst").avg("delay").sort(desc("avg(delay)")))

DataFrame[src: string, dst: string, avg(delay): double]

#### 2.5 What destinations tend to have delays

In [34]:
# After displaying tripDelays, use Plot Options to set `state_dst` as a Key.
tripDelays = tripGraph.edges.filter("delay > 0")
display(tripDelays)

DataFrame[tripid: int, delay: int, src: string, dst: string, city_dst: string, state_dst: string]

In [35]:
tripDelays.show()

+-------+-----+---+---+-------------------+---------+
| tripid|delay|src|dst|           city_dst|state_dst|
+-------+-----+---+---+-------------------+---------+
|1021111|    7|MSP|INL|International Falls|       MN|
|1061115|   33|MSP|INL|International Falls|       MN|
|1071115|   23|MSP|INL|International Falls|       MN|
|1091115|   11|MSP|INL|International Falls|       MN|
|1171115|    4|MSP|INL|International Falls|       MN|
|2091925|    1|MSP|INL|International Falls|       MN|
|2152015|   16|MSP|INL|International Falls|       MN|
|2161925|  169|MSP|INL|International Falls|       MN|
|2171115|   27|MSP|INL|International Falls|       MN|
|2181115|   96|MSP|INL|International Falls|       MN|
|2281115|    5|MSP|INL|International Falls|       MN|
|3031115|   17|MSP|INL|International Falls|       MN|
|3171115|   25|MSP|INL|International Falls|       MN|
|3181115|    2|MSP|INL|International Falls|       MN|
|3271115|    9|MSP|INL|International Falls|       MN|
|1011646|   71|IAH|MSY|     

#### 2.6 What destinations tend to have significant delays departing from SEA

In [43]:
# States with the longest cumulative delays (with individual delays > 100 minutes) (origin: Seattle)
display(tripGraph.edges.filter("src = 'SEA' and delay > 100"))

DataFrame[tripid: int, delay: int, src: string, dst: string, city_dst: string, state_dst: string]

In [44]:
tripGraph.edges.filter("src = 'SEA' and delay > 100").show()

+-------+-----+---+---+-------------+---------+
| tripid|delay|src|dst|     city_dst|state_dst|
+-------+-----+---+---+-------------+---------+
|3201938|  108|SEA|BUR|      Burbank|       CA|
|3201655|  107|SEA|SNA|Orange County|       CA|
|1011950|  123|SEA|OAK|      Oakland|       CA|
|1021950|  194|SEA|OAK|      Oakland|       CA|
|1021615|  317|SEA|OAK|      Oakland|       CA|
|1021755|  385|SEA|OAK|      Oakland|       CA|
|1031950|  283|SEA|OAK|      Oakland|       CA|
|1031615|  364|SEA|OAK|      Oakland|       CA|
|1031325|  130|SEA|OAK|      Oakland|       CA|
|1061755|  107|SEA|OAK|      Oakland|       CA|
|1081330|  118|SEA|OAK|      Oakland|       CA|
|2282055|  150|SEA|OAK|      Oakland|       CA|
|3061600|  130|SEA|OAK|      Oakland|       CA|
|3170815|  199|SEA|DCA|Washington DC|     null|
|2151845|  128|SEA|KTN|    Ketchikan|       AK|
|2281845|  104|SEA|KTN|    Ketchikan|       AK|
|3130720|  117|SEA|KTN|    Ketchikan|       AK|
|1011411|  177|SEA|IAH|      Houston|   

## 3. Vertex Degrees
* `inDegrees`: Incoming connections to the airport
* `outDegrees`: Outgoing connections from the airport 
* `degrees`: Total connections to and from the airport

Reviewing the various properties of the property graph to understand the incoming and outgoing connections between airports.

In [46]:
# Degrees
#  The number of degrees - the number of incoming and outgoing connections - for various airports within this sample dataset
display(tripGraph.degrees.sort(desc("degree")).limit(20))

DataFrame[id: string, degree: int]

In [47]:
tripGraph.degrees.sort(desc("degree")).limit(20).show()

+---+------+
| id|degree|
+---+------+
|ATL|179774|
|DFW|133966|
|ORD|125405|
|LAX|106853|
|DEN|103699|
|IAH| 85685|
|PHX| 79672|
|SFO| 77635|
|LAS| 66101|
|CLT| 56103|
|EWR| 54407|
|MCO| 54300|
|LGA| 50927|
|SLC| 50780|
|BOS| 49936|
|DTW| 46705|
|MSP| 46235|
|SEA| 45816|
|JFK| 43661|
|BWI| 42526|
+---+------+



In [48]:
# inDegrees
#  The number of degrees - the number of incoming connections - for various airports within this sample dataset
# display(tripGraph.inDegrees.sort(desc("inDegree")).limit(20))
tripGraph.inDegrees.sort(desc("inDegree")).limit(20).show()

+---+--------+
| id|inDegree|
+---+--------+
|ATL|   89633|
|DFW|   65767|
|ORD|   61654|
|LAX|   53184|
|DEN|   50738|
|IAH|   42512|
|PHX|   39619|
|SFO|   38641|
|LAS|   32994|
|CLT|   28044|
|EWR|   27201|
|MCO|   27071|
|LGA|   25469|
|SLC|   25169|
|BOS|   24973|
|DTW|   23297|
|SEA|   22906|
|MSP|   22372|
|JFK|   21832|
|BWI|   21262|
+---+--------+



In [49]:
# outDegrees
#  The number of degrees - the number of outgoing connections - for various airports within this sample dataset
# display(tripGraph.outDegrees.sort(desc("outDegree")).limit(20))
tripGraph.outDegrees.sort(desc("outDegree")).limit(20).show()

+---+---------+
| id|outDegree|
+---+---------+
|ATL|    90141|
|DFW|    68199|
|ORD|    63751|
|LAX|    53669|
|DEN|    52961|
|IAH|    43173|
|PHX|    40053|
|SFO|    38994|
|LAS|    33107|
|CLT|    28059|
|MCO|    27229|
|EWR|    27206|
|SLC|    25611|
|LGA|    25458|
|BOS|    24963|
|MSP|    23863|
|DTW|    23408|
|SEA|    22910|
|JFK|    21829|
|BWI|    21264|
+---+---------+



## 4. City / Flight Relationships through Motif Finding
To more easily understand the complex relationship of city airports and their flights with each other, we can use motifs to find patterns of airports (i.e. vertices) connected by flights (i.e. edges). The result is a DataFrame in which the column names are given by the motif keys.

#### 4.1 What delays might we blame on SFO

In [50]:
# Using tripGraphPrime to more easily display 
#   - The associated edge (ab, bc) relationships 
#   - With the different the city / airports (a, b, c) where SFO is the connecting city (b)
#   - Ensuring that flight ab (i.e. the flight to SFO) occured before flight bc (i.e. flight leaving SFO)
#   - Note, TripID was generated based on time in the format of MMDDHHMM converted to int
#       - Therefore bc.tripid < ab.tripid + 10000 means the second flight (bc) occured within approx a day of the first flight (ab)
# Note: In reality, we would need to be more careful to link trips ab and bc.
motifs = tripGraphPrime.find("(a)-[ab]->(b); (b)-[bc]->(c)")\
  .filter("(b.id = 'SFO') and (ab.delay > 500 or bc.delay > 500) and bc.tripid > ab.tripid and bc.tripid < ab.tripid + 10000")
display(motifs)

DataFrame[a: struct<id:string,City:string,State:string,Country:string>, ab: struct<tripid:int,delay:int,src:string,dst:string>, b: struct<id:string,City:string,State:string,Country:string>, bc: struct<tripid:int,delay:int,src:string,dst:string>, c: struct<id:string,City:string,State:string,Country:string>]

In [51]:
motifs.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                   a|                  ab|                   b|                  bc|                   c|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[MSY, New Orleans...|[1011751, -4, MSY...|[SFO, San Francis...|[1021507, 536, SF...|[JFK, New York, N...|
|[MSY, New Orleans...|[1201725, 2, MSY,...|[SFO, San Francis...|[1211508, 593, SF...|[JFK, New York, N...|
|[MSY, New Orleans...|[2091725, 87, MSY...|[SFO, San Francis...|[2092110, 740, SF...|[MIA, Miami, FL, ...|
|[MSY, New Orleans...|[2091725, 87, MSY...|[SFO, San Francis...|[2092230, 636, SF...|[JFK, New York, N...|
|[MSY, New Orleans...|[2121725, 15, MSY...|[SFO, San Francis...|[2131420, 504, SF...|[SAN, San Diego, ...|
|[BUR, Burbank, CA...|[1011828, 88, BUR...|[SFO, San Francis...|[1021507, 536, SF...|[JFK, New York, N...|
|[BUR, Burbank, CA...|[1020941, -17, 

## 5. Determining Airport Ranking using PageRank
There are a large number of flights and connections through these various airports included in this Departure Delay Dataset.  Using the `pageRank` algorithm, Spark iteratively traverses the graph and determines a rough estimate of how important the airport is.

In [52]:
# Determining Airport ranking of importance using `pageRank`
ranks = tripGraph.pageRank(resetProbability=0.15, maxIter=5)
# display(ranks.vertices.orderBy(ranks.vertices.pagerank.desc()).limit(10))
ranks.vertices.orderBy(ranks.vertices.pagerank.desc()).limit(10).show()

+---+--------------+-----+-------+------------------+
| id|          City|State|Country|          pagerank|
+---+--------------+-----+-------+------------------+
|ATL|       Atlanta|   GA|    USA|18.910104616729814|
|DFW|        Dallas|   TX|    USA|13.699227467378964|
|ORD|       Chicago|   IL|    USA|13.163049993795985|
|DEN|        Denver|   CO|    USA| 9.723388283811563|
|LAX|   Los Angeles|   CA|    USA| 8.703656827807166|
|IAH|       Houston|   TX|    USA| 7.991324463091128|
|SFO| San Francisco|   CA|    USA| 6.903242998287933|
|PHX|       Phoenix|   AZ|    USA| 6.505886984498643|
|SLC|Salt Lake City|   UT|    USA| 5.799587684561128|
|LAS|     Las Vegas|   NV|    USA|  5.25359244560915|
+---+--------------+-----+-------+------------------+



## 6. Most popular flights (single city hops)
Using the `tripGraph`, we can quickly determine what are the most popular single city hop flights

In [53]:
# Determine the most popular flights (single city hops)
import pyspark.sql.functions as func
topTrips = tripGraph \
  .edges \
  .groupBy("src", "dst") \
  .agg(func.count("delay").alias("trips")) 

In [54]:
# Show the top 20 most popular flights (single city hops)
# display(topTrips.orderBy(topTrips.trips.desc()).limit(10))
topTrips.orderBy(topTrips.trips.desc()).limit(10).show()

+---+---+-----+
|src|dst|trips|
+---+---+-----+
|SFO|LAX| 3232|
|LAX|SFO| 3198|
|LAS|LAX| 3016|
|LAX|LAS| 2964|
|JFK|LAX| 2720|
|LAX|JFK| 2719|
|ATL|LGA| 2501|
|LGA|ATL| 2500|
|LAX|PHX| 2394|
|PHX|LAX| 2387|
+---+---+-----+



## 7. Top Transfer Cities
Many airports are used as transfer points instead of the final Destination.  An easy way to calculate this is by calculating the ratio of inDegree (the number of flights to the airport) / outDegree (the number of flights leaving the airport).  Values close to 1 may indicate many transfers, whereas values < 1 indicate many outgoing flights and > 1 indicate many incoming flights.  Note, this is a simple calculation that does not take into account of timing or scheduling of flights, just the overall aggregate number within the dataset.

In [55]:
# Calculate the inDeg (flights into the airport) and outDeg (flights leaving the airport)
inDeg = tripGraph.inDegrees
outDeg = tripGraph.outDegrees

# Calculate the degreeRatio (inDeg/outDeg)
degreeRatio = inDeg.join(outDeg, inDeg.id == outDeg.id) \
  .drop(outDeg.id) \
  .selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio") \
  .cache()

# Join back to the `airports` DataFrame (instead of registering temp table as above)
nonTransferAirports = degreeRatio.join(airports, degreeRatio.id == airports.IATA) \
  .selectExpr("id", "city", "degreeRatio") \
  .filter("degreeRatio < .9 or degreeRatio > 1.1")


In [56]:
# List out the city airports which have abnormal degree ratios.
display(nonTransferAirports)

DataFrame[id: string, city: string, degreeRatio: double]

In [57]:
nonTransferAirports.show()

+---+-----------+-------------------+
| id|       city|        degreeRatio|
+---+-----------+-------------------+
|GFK|Grand Forks| 1.3333333333333333|
|FAI|  Fairbanks| 1.1232686980609419|
|OME|       Nome| 0.5084745762711864|
|BRW|     Barrow|0.28651685393258425|
+---+-----------+-------------------+



In [58]:
# Join back to the `airports` DataFrame (instead of registering temp table as above)
transferAirports = degreeRatio.join(airports, degreeRatio.id == airports.IATA) \
  .selectExpr("id", "city", "degreeRatio") \
  .filter("degreeRatio between 0.9 and 1.1")


In [59]:
# List out the top 10 transfer city airports
# display(transferAirports.orderBy("degreeRatio").limit(10))
transferAirports.orderBy("degreeRatio").limit(10).show()

+---+--------------+------------------+
| id|          city|       degreeRatio|
+---+--------------+------------------+
|MSP|   Minneapolis|0.9375183338222353|
|DEN|        Denver| 0.958025717037065|
|DFW|        Dallas| 0.964339653074092|
|ORD|       Chicago|0.9671063983310065|
|SLC|Salt Lake City|0.9827417906368358|
|IAH|       Houston|0.9846895050147083|
|PHX|       Phoenix|0.9891643572266746|
|OGG| Kahului, Maui|0.9898718478710211|
|HNL|Honolulu, Oahu| 0.990535889872173|
|SFO| San Francisco|0.9909473252295224|
+---+--------------+------------------+



## 8. Breadth First Search 
Breadth-first search (BFS) is designed to traverse the graph to quickly find the desired vertices (i.e. airports) and edges (i.e flights).  Let's try to find the shortest number of connections between cities based on the dataset.  Note, these examples do not take into account of time or distance, just hops between cities.

In [60]:
# Example 1: Direct Seattle to San Francisco 
filteredPaths = tripGraph.bfs(
  fromExpr = "id = 'SEA'",
  toExpr = "id = 'SFO'",
  maxPathLength = 1)


In [61]:
# display(filteredPaths)
filteredPaths.show()

+--------------------+--------------------+--------------------+
|                from|                  e0|                  to|
+--------------------+--------------------+--------------------+
|[SEA, Seattle, WA...|[1010710, 31, SEA...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1012125, -4, SEA...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1011840, -5, SEA...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1010610, -4, SEA...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1011230, -2, SEA...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1010955, -6, SEA...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1011100, 2, SEA,...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1011405, 0, SEA,...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1020710, -1, SEA...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1022125, -4, SEA...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1021840, -5, SEA...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1020610, 3, SEA,...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[10

As you can see, there are a number of direct flights between Seattle and San Francisco.

In [62]:
# Example 2: Direct San Francisco and Buffalo
filteredPaths = tripGraph.bfs(
  fromExpr = "id = 'SFO'",
  toExpr = "id = 'BUF'",
  maxPathLength = 1)
# display(filteredPaths)

In [63]:
filteredPaths.show()

+---+----+-----+-------+
| id|City|State|Country|
+---+----+-----+-------+
+---+----+-----+-------+



But there are no direct flights between San Francisco and Buffalo.

In [64]:
# Example 2a: Flying from San Francisco to Buffalo
filteredPaths = tripGraph.bfs(
  fromExpr = "id = 'SFO'",
  toExpr = "id = 'BUF'",
  maxPathLength = 2)
# display(filteredPaths)

In [65]:
filteredPaths.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                from|                  e0|                  v1|                  e1|                  to|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[SFO, San Francis...|[1010700, 0, SFO,...|[BOS, Boston, MA,...|[1010635, -6, BOS...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1010700, 0, SFO,...|[BOS, Boston, MA,...|[1011059, 13, BOS...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1010700, 0, SFO,...|[BOS, Boston, MA,...|[1011427, 19, BOS...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1010700, 0, SFO,...|[BOS, Boston, MA,...|[1020635, -4, BOS...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1010700, 0, SFO,...|[BOS, Boston, MA,...|[1021059, 0, BOS,...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1010700, 0, SFO,...|[BOS, Boston, MA,...|[1021427, 194, BO...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1010700, 0, SF

But there are flights from San Francisco to Buffalo with Minneapolis as the transfer point.  But what are the most popular layovers between `SFO` and `BUF`?

In [66]:
# Display most popular layover cities by descending count
# display(filteredPaths.groupBy("v1.id", "v1.City").count().orderBy(desc("count")).limit(10))
filteredPaths.groupBy("v1.id", "v1.City").count().orderBy(desc("count")).limit(10).show()

+---+---------------+-------+
| id|           City|  count|
+---+---------------+-------+
|JFK|       New York|1233728|
|ORD|        Chicago|1088283|
|ATL|        Atlanta| 285383|
|LAS|      Las Vegas| 275091|
|BOS|         Boston| 238576|
|CLT|      Charlotte| 143444|
|PHX|        Phoenix| 104580|
|FLL|Fort Lauderdale|  96317|
|EWR|         Newark|  95370|
|MCO|        Orlando|  88615|
+---+---------------+-------+



## 9. Loading the D3 Visualization
Using the airports D3 visualization to visualize airports and flight paths

see reference