# Graph Analysis using Spark - GraphFrames

#### Graphs are data structures composed of nodes, or vertices, which are arbitrary objects, and edges that define relationships between these nodes. 

#### Graph analytics is the process of analyzing these relationships.

An example graph might be your friend group. 
<br>In the context of graph analytics, each vertex or node would represent a person, and each edge would represent a relationship.

#### Edges and vertices in graphs can also have data associated with them.

In our friend example, the weight of the edge might represent the intimacy between different friends; 
<br>acquaintances would have low-weight edges between them, 
<br>while married individuals would have edges with large weights. 
<br>We could set this value by looking at communication frequency between nodes and weighting the edges accordingly. 
<br>Each vertex (person) might also have data such as a name.

#### Graphs are a natural way of describing relationships and many different problem sets.

#### Some business use cases could be
Fraud Detection & Analytics - Spot Fraud Rings in Their Tracks
<br>motif finding.
<br>Determining importance of papers in bibliographic networks (i.e., which papers are most referenced).
<br>Ranking web pages, as Google famously used the PageRank algorithm to do.
<br>Identity & Access Management - Track Roles, Groups and Assets like Never Before.
<br>Knowledge Graph - Augment Your Knowledge Graph with Highly Contextual Search Results.
<br>Master Data Management - Graphs Provide a 360° View of Your Data.
<br>Network and Database Infrastructure Monitoring for IT Operationss - Manage and Monitor Complex Networks with Real-Time Insights.
<br> And Many more ....

### SPARK GraphX and GraphFrames
Spark provides several ways of working in this analytics paradigm.
<br>Spark has long contained an RDD-based library for performing graph processing: GraphX.
<br>This provided a very low-level interface that was extremely powerful, but just like RDDs, wasn’t easy to use or optimize.
<br>GraphX remains a core part of Spark.
<br>Developers of Spark, have recently created a next-generation graph analytics library on Spark: GraphFrames.
<br>GraphFrames extends GraphX to provide a DataFrame API and support for Spark’s different language bindings,
<br>so that users of Python can take advantage of the scalability of the tool.

<br>**GraphFrames** is currently available as a Spark package, 
<br>an external package that you need to load when you start up your Spark application, 
<br>but may be merged into the core of Spark in the future.

<br>**HOW DOES GRAPHFRAMES COMPARE TO GRAPH DATABASES?**
<br>Spark is not a database.
<br>Spark is a distributed computation engine, but it does not store data long-term or perform transactions.
<br>You can build a graph computation on top of Spark, but that’s fundamentally different from a database.
<br>GraphFrames can scale to much larger workloads than many graph databases and 
<br>performs well for analytics but does not support transactional processing and serving.

Before initiating the jupyter notebook, pass external dependencies to the pyspark kernel
<br>export PACKAGES="graphframes:graphframes:0.5.0-spark2.1-s_2.11"
<br>export PYSPARK_SUBMIT_ARGS="--packages ${PACKAGES} pyspark-shell"

## Problem Statement
There are two datasets airports.dat (Data about all the airports) and departureDelays.csv (trips delayed at the departure) obtained from OpenFlights and US DoT.

** Analyze these datasets by**
<br>Creating a graph structure using airports and departure delays data where
each airport as a node and trips between two airports as edge/relation/connections.

Upon building the graph structure, analyze the graph to find
<br>1. Delayed vs On-Time flights
<br>2. Which flights departing from a specific location are most likely to have significant delays?
<br>3. Which destinations tend to have delays?
<br>4. Which destinations tend to have significant delays departing from a specific location?
<br>5. Degree wise (InDegree, OutDegree) analysis.
<br>6. Motif Findings.
<br>7. Determine the influential airports using PageRank.
<br>8. Find most popular trips.
<br>9. Find transfer cities/hubs.
<br>10. Use Breadth First Search to find the connections between two cities with 1 Hop, 2 Hops...

### Dataset Descriptions
#### Airports data - airports.dat - 
#### OpenFlights: Airport, airline and route data : https://openflights.org/data.html
<br>**Airport ID** Unique OpenFlights identifier for this airport.  
<br>**Name** Name of airport. May or may not contain the City name. 
<br>**City** Main city served by airport. May be spelled differently from Name. 
<br>**Country** Country or territory where airport is located. See countries.dat to cross-reference to ISO 3166-1 codes.  
<br>**IATA** 3-letter IATA code. Null if not assigned/unknown. 
<br>**ICAO** 4-letter ICAO code - Null if not assigned. 
<br>**Latitude** Decimal degrees, usually to six significant digits. Negative is South, positive is North. 
<br>**Longitude** Decimal degrees, usually to six significant digits. Negative is West, positive is East. 
<br>**Altitude** In feet. 
<br>**Timezone** Hours offset from UTC. Fractional hours are expressed as decimals, eg. India is 5.5. 
<br>**DST** Daylight savings time. 
<br>One of E (Europe), A (US/Canada), S (South America), O (Australia), Z (New Zealand), N (None) or U (Unknown). 
<br>See also: Help: Time  
<br>**Tz database time zone** Timezone in "tz" (Olson) format, eg. "America/Los_Angeles".  
<br>**Type** Type of the airport. 
<br>Value "airport" for air terminals, 
<br>"station" for train stations, 
<br>"port" for ferry terminals and 
<br>"unknown" if not known. 
<br>In airports.csv, only type=airport is included.  
<br>**Source** Source of this data. 
<br>"OurAirports" for data sourced from OurAirports, 
<br>"Legacy" for old data not matched to OurAirports (mostly DAFIF), 
<br>"User" for unverified user contributions. 
<br>In airports.csv, only source=OurAirports is included.  

The data is UTF-8 (Unicode) encoded.


#### Departure Delays Data - departureDelays.csv 
#### Source: United States Department of Transportation: Bureau of Transportation Statistics (TranStats)
#### https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time 
#### March 2017 Data

<br>**flightDate** Flight Date (yyyymmddHHMM) 
<br>**originAirportID** Origin Airport, Airport ID. An identification number assigned by US DOT to identify a unique airport.
<br>**origin** Origin Airport.
<br>**originCity** Origin Airport, City Name.
<br>**originState** Origin Aiport, State Code.
<br>**destAirportID** Destination Airport, Airport ID. An identification number assigned by US DOT to identify a unique airport.
<br>**destination** Destination Airport.
<br>**destinationCity** Destination Airport, City Name.
<br>**destinationState** Destination Airport, State Code.
<br>**depDelayInMinutes** Difference in minutes between scheduled and actual departure time. Early departures set to 0.
<br>**distanceInMiles** Distance between Airports (in miles)

#### Configure Spark Environment

In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

#### Create and Initialize Spark Driver

In [2]:
## Create SparkContext, SparkSession
from pyspark.sql import SparkSession
from pyspark import SparkContext,SparkConf
conf = SparkConf().setAll([('spark.jars.packages','graphframes:graphframes:0.5.0-spark2.1-s_2.11')])
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

#### Verify Spark Driver - Spark Context and Spark Sessions

In [3]:
## Verify Spark Context
sc

In [4]:
## Verify Spark Session
spark

#### Load Dependent Libraries

#### Spark libraries

In [5]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from graphframes import *

In [6]:
import numpy as np
import StringIO
import pandas as pd
import warnings

In [7]:
# Plotting libraries
import matplotlib.pyplot as plt
import seaborn as sns

import plotly
#plotly.tools.set_credentials_file(username='<your_user_id>', api_key='<your_api_key>')
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.figure_factory as ff
import plotly.plotly as py
import plotly.offline as pyoff
import plotly.graph_objs as go

# Initializing some settings
sns.set_style('whitegrid')
sns.set(color_codes=True)
warnings.filterwarnings('ignore')
pyoff.init_notebook_mode(connected=True)
get_ipython().magic('matplotlib inline')

#### Create dataframes from the 2 datasets.

#### Set file paths

In [8]:
tripdelaysFilePath = "/user/thomasj/departureDelays.csv"
airportsnaFilePath = "/user/thomasj/airports.dat"

#### Define schema for airports data

In [9]:
airportsDataSchema = StructType([
         StructField("airportID", IntegerType(), True),
         StructField("airportName", StringType(), True),
         StructField("city", StringType(), True),
         StructField("country", StringType(), True),
         StructField("IATA", StringType(), True),
         StructField("ICAO", StringType(), True),
         StructField("latitude", DoubleType(), True),
         StructField("longitude", DoubleType(), True),        
         StructField("altitude", IntegerType(), True),
         StructField("timezone", IntegerType(), True),
         StructField("dst", StringType(), True),
         StructField("tzDBTimezone", StringType(), True),
         StructField("type", StringType(), True),
         StructField("source", StringType(), True)])

#### Create dataframe from airports dataset

In [10]:
airportsDF = spark.read.format("csv")\
            .option("header", "false")\
            .option("inferSchema", "false")\
            .load(airportsnaFilePath, schema = airportsDataSchema)

#### Verify counts

In [11]:
print("Rows and columns in Airports dataset are {} and {}".format(airportsDF.count(), len(airportsDF.columns)))

Rows and columns in Airports dataset are 7184 and 14


#### Verify records

In [12]:
airportsDF.show(4)

+---------+--------------------+-----------+----------------+----+----+------------------+------------------+--------+--------+---+--------------------+-------+-----------+
|airportID|         airportName|       city|         country|IATA|ICAO|          latitude|         longitude|altitude|timezone|dst|        tzDBTimezone|   type|     source|
+---------+--------------------+-----------+----------------+----+----+------------------+------------------+--------+--------+---+--------------------+-------+-----------+
|        1|      Goroka Airport|     Goroka|Papua New Guinea| GKA|AYGA|-6.081689834590001|     145.391998291|    5282|      10|  U|Pacific/Port_Moresby|airport|OurAirports|
|        2|      Madang Airport|     Madang|Papua New Guinea| MAG|AYMD|    -5.20707988739|     145.789001465|      20|      10|  U|Pacific/Port_Moresby|airport|OurAirports|
|        3|Mount Hagen Kagam...|Mount Hagen|Papua New Guinea| HGU|AYMH|-5.826789855957031|144.29600524902344|    5388|      10|  U|Paci

#### Define schema for departure Delays data

In [13]:
departureDelaysSchema = StructType([
         StructField("flightDate", StringType(), True),
         StructField("originAirportID", IntegerType(), True),
         StructField("origin", StringType(), True),
         StructField("originCity", StringType(), True),
         StructField("originState", StringType(), True),
         StructField("destAirportID", IntegerType(), True),
         StructField("destination", StringType(), True),
         StructField("destinationCity", StringType(), True),
         StructField("destinationState", StringType(), True),
         StructField("depDelayInMinutes", DoubleType(), True),        
         StructField("distanceInMiles", DoubleType(), True)])

#### Create dataframe from departureDelays dataset

In [14]:
departureDelaysDF = spark.read.format("csv")\
                    .option("header", "true")\
                    .option("inferSchema", "true")\
                    .load(tripdelaysFilePath, schema = departureDelaysSchema)

In [15]:
print("Rows and columns in Airports dataset are {} and {}".format(departureDelaysDF.count(), len(departureDelaysDF.columns)))

Rows and columns in Airports dataset are 488596 and 11


#### Cache departure delays dataframe

In [16]:
departureDelaysDF.cache()

DataFrame[flightDate: string, originAirportID: int, origin: string, originCity: string, originState: string, destAirportID: int, destination: string, destinationCity: string, destinationState: string, depDelayInMinutes: double, distanceInMiles: double]

In [17]:
departureDelaysDF.show(10)

+------------+---------------+------+-------------+-----------+-------------+-----------+---------------+----------------+-----------------+---------------+
|  flightDate|originAirportID|origin|   originCity|originState|destAirportID|destination|destinationCity|destinationState|depDelayInMinutes|distanceInMiles|
+------------+---------------+------+-------------+-----------+-------------+-----------+---------------+----------------+-----------------+---------------+
|201703010000|          10693|   BNA|Nashville, TN|         TN|        12892|        LAX|Los Angeles, CA|              CA|             null|         1797.0|
|201703010000|          10721|   BOS|   Boston, MA|         MA|        10821|        BWI|  Baltimore, MD|              MD|             null|          369.0|
|201703010000|          10800|   BUR|  Burbank, CA|         CA|        13796|        OAK|    Oakland, CA|              CA|             null|          325.0|
|201703010000|          10800|   BUR|  Burbank, CA|       

#### Define Tables/Views for the above dataframes to execute sql queries

In [18]:
airportsDF.createOrReplaceTempView("airportsDF_SQL")
departureDelaysDF.createOrReplaceTempView("departureDelaysDF_SQL")

As the airports dataset consists of all the airports details
<br>Filter only the airports where there is a record/evidence exists for an airport in the trip datasets
<br>1. Get unique airport codes (iata) combining origin and destination from the departureDelays dataset
<br>2. Filter/Include only the airports returned from above from the airprotsDF

#### Extract all unique IATA codes from the departure delays dataframe

In the data the IATA codes are available in origin and destination fields
<br>Find the unique origin codes 
<br>Find the unique destination codes
<br>combine both the above 
<br>Find the unique/distinct set from the above

In [19]:
uniqueIATACodes = spark.sql("""SELECT DISTINCT iata, state FROM 
                           (SELECT DISTINCT origin AS iata, originState as state FROM departureDelaysDF_SQL
                            UNION ALL
                            SELECT DISTINCT destination AS iata, destinationState as state FROM departureDelaysDF_SQL)
                            a
                            """)

In [20]:
uniqueIATACodes.show(5)

+----+-----+
|iata|state|
+----+-----+
| SJC|   CA|
| BTV|   VT|
| MEI|   MS|
| ROC|   NY|
| SAF|   NM|
+----+-----+
only showing top 5 rows



In [21]:
uniqueIATACodes.createOrReplaceTempView("uniqueIATACodes_SQL")

#### Filter airports exists in the above list
Only include airports with atleast one trip from the departureDelays dataset

In [22]:
airportsNADF = spark.sql("""SELECT at.IATA, at.city, it.state, at.country 
                            FROM airportsDF_SQL at 
                            JOIN uniqueIATACodes_SQL it 
                            ON at.IATA = it.IATA""")

In [23]:
airportsNADF.cache()
airportsNADF.createOrReplaceTempView("airportsNADF_SQL")

In [24]:
airportsNADF.show(4)

+----+----------+-----+-------------+
|IATA|      city|state|      country|
+----+----------+-----+-------------+
| SJC|  San Jose|   CA|United States|
| BTV|Burlington|   VT|United States|
| MEI|  Meridian|   MS|United States|
| ROC| Rochester|   NY|United States|
+----+----------+-----+-------------+
only showing top 4 rows



#### Build departureDelays_imp DataFrame
Obtain key attributes such as Date of flight, delays, distance, and airport information (Origin, Destination)

In [25]:
departureDelays_imp = spark.sql("""SELECT 
                                    CAST(ddt.flightDate AS STRING) AS flightDate,
                                    CAST(ddt.depDelayInMinutes AS INT) AS departureDelayInMinutes, 
                                    CAST(ddt.distanceInMiles AS INT) AS distanceInMiles, 
                                    ddt.origin AS src, 
                                    ddt.destination AS dst, 
                                    ats.city AS src_city, 
                                    atd.city AS dst_city, 
                                    ddt.originState AS src_state, 
                                    ddt.destinationState AS dst_state 
                                    FROM departureDelaysDF_SQL ddt 
                                    JOIN airportsNADF_SQL ats 
                                    ON ats.iata = ddt.origin 
                                    JOIN airportsNADF_SQL atd 
                                    ON atd.iata = ddt.destination""")

#### Add an unique index column (sequential number) to the above dataframe

In [26]:
# Modified Schema
schemaNew  = StructType([StructField("tripId", LongType(), False)] + departureDelays_imp.schema.fields[:])

In [27]:
### from pyspark.sql.functions import monotonicallyIncreasingId
### This will return a new DF with all the columns + id

### * Returns monotonically increasing 64-bit integers.
### *
### * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
### * The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the 
### * record number within each partition. The assumption is that the data frame has less than 1 billion partitions, 
### * and each partition has less than 8 billion records.
###
### departureDelays_imp = departureDelays_imp.withColumn("tripId", monotonicallyIncreasingId())

In [28]:
departureDelays_imp = (departureDelays_imp.rdd.zipWithIndex().map(lambda line: (line[1], line[0][0],line[0][1],line[0][2],line[0][3],line[0][4],line[0][5],line[0][6],line[0][7],line[0][8])).toDF(schemaNew))

In [29]:
departureDelays_imp = departureDelays_imp.fillna(0)

In [30]:
departureDelays_imp.cache()
departureDelays_imp.createOrReplaceTempView("departureDelays_imp_SQL")

In [31]:
departureDelays_imp.show(5)

+------+------------+-----------------------+---------------+---+---+---------+-----------+---------+---------+
|tripId|  flightDate|departureDelayInMinutes|distanceInMiles|src|dst| src_city|   dst_city|src_state|dst_state|
+------+------------+-----------------------+---------------+---+---+---------+-----------+---------+---------+
|     0|201703010000|                      0|           1797|BNA|LAX|Nashville|Los Angeles|       TN|       CA|
|     1|201703010000|                      0|            369|BOS|BWI|   Boston|  Baltimore|       MA|       MD|
|     2|201703010000|                      0|            325|BUR|OAK|  Burbank|    Oakland|       CA|       CA|
|     3|201703010000|                      0|            296|BUR|SJC|  Burbank|   San Jose|       CA|       CA|
|     4|201703010000|                      0|            296|BUR|SJC|  Burbank|   San Jose|       CA|       CA|
+------+------------+-----------------------+---------------+---+---+---------+-----------+---------+---

### Building the Graph
<br>** Now that we've imported our data, we're going to need to build our graph. 
<br> To do so we're going to do two things. 
<br> We are going to build the structure of the vertices (or nodes) and build the structure of the edges. 
<br> What's awesome about GraphFrames is that this process is incredibly simple.
<br> Rename IATA airport code to id in the Vertices Table
<br> Start and End airports to src and dst for the Edges Table (flights)
<br> These are required naming conventions for vertices and edges in GraphFrames as of the time of this writing. **

<br> ** Note, ensure you have already installed the GraphFrames spark-package **

In [32]:
from graphframes import *

** Create Vertices (airports) and Edges (flights)
<br> Users can create GraphFrames from vertex and edge DataFrames.
<br> Vertex DataFrame: A vertex DataFrame should contain a special column named “id” which specifies unique IDs for each vertex in the graph.
<br> Edge DataFrame: An edge DataFrame should contain two special columns: “src” (source vertex ID of edge) and “dst” (destination vertex ID of edge).
<br> Both DataFrames can have arbitrary other columns. Those columns can represent vertex and edge attributes. **

In [33]:
tripVertices = airportsNADF.withColumnRenamed("IATA", "id").distinct()
tripEdges = departureDelays_imp.select("tripId", "departureDelayInMinutes", "distanceInMiles", "src", "dst", "src_city", "src_state", "dst_city", "dst_state")

In [34]:
tripVertices.cache()
tripEdges.cache()

DataFrame[tripId: bigint, departureDelayInMinutes: int, distanceInMiles: int, src: string, dst: string, src_city: string, src_state: string, dst_city: string, dst_state: string]

#### Vertices - The vertices of our graph are the airports

In [35]:
tripVertices.show(4)

+---+----------+-----+-------------+
| id|      city|state|      country|
+---+----------+-----+-------------+
|OGG|   Kahului|   HI|United States|
|GSO|Greensboro|   NC|United States|
|DEN|    Denver|   CO|United States|
|CVG|Cincinnati|   KY|United States|
+---+----------+-----+-------------+
only showing top 4 rows



#### Edges - The edges of our graph are the flights between airports

In [36]:
tripEdges.show(4)

+------+-----------------------+---------------+---+---+---------+---------+-----------+---------+
|tripId|departureDelayInMinutes|distanceInMiles|src|dst| src_city|src_state|   dst_city|dst_state|
+------+-----------------------+---------------+---+---+---------+---------+-----------+---------+
|     0|                      0|           1797|BNA|LAX|Nashville|       TN|Los Angeles|       CA|
|     1|                      0|            369|BOS|BWI|   Boston|       MA|  Baltimore|       MD|
|     2|                      0|            325|BUR|OAK|  Burbank|       CA|    Oakland|       CA|
|     3|                      0|            296|BUR|SJC|  Burbank|       CA|   San Jose|       CA|
+------+-----------------------+---------------+---+---+---------+---------+-----------+---------+
only showing top 4 rows



#### Build **tripGraph** GraphFrame
This GraphFrame builds up on the vertices and edges based on our trips (flights)

In [37]:
tripGraph = GraphFrame(tripVertices, tripEdges)
print tripGraph

GraphFrame(v:[id: string, city: string ... 2 more fields], e:[src: string, dst: string ... 7 more fields])


#### Build tripGraphPrime GraphFrame
This graphframe contains a smaller subset of data to make it easier to display motifs and subgraphs (below)

In [38]:
tripEdgesPrime = departureDelays_imp.select("tripId", "departureDelayInMinutes", "distanceInMiles", "src", "dst")
tripGraphPrime = GraphFrame(tripVertices, tripEdgesPrime)
print tripGraphPrime

GraphFrame(v:[id: string, city: string ... 2 more fields], e:[src: string, dst: string ... 3 more fields])


#### Determine the number of airports and trips

In [39]:
print("Airports: {} and Trips: {}".format(tripGraph.vertices.count(), tripGraph.edges.count()))

Airports: 294 and Trips: 488534


#### Determining the longest delay in this dataset

In [40]:
longestDelay = tripGraph.edges.groupBy().max("departureDelayInMinutes")
longestDelay.show()

+----------------------------+
|max(departureDelayInMinutes)|
+----------------------------+
|                        1773|
+----------------------------+



#### 1. Determining the number of delayed vs. on-time / early flights

In [41]:
print("On-time / Early Flights: {}".format(tripGraph.edges.filter("departureDelayInMinutes <= 0").count()))
print("Delayed Flights: {}".format(tripGraph.edges.filter("departureDelayInMinutes > 0").count()))

On-time / Early Flights: 320928
Delayed Flights: 167606


#### 2. Which flights departing 'SFO' are most likely to have significant delays ?
Note, delay can be <= 0 meaning the flight left on time or early

In [42]:
tripGraph.edges\
.filter("src = 'SFO' AND departureDelayInMinutes > 0")\
.groupBy("src", "dst")\
.avg("departureDelayInMinutes")\
.sort(desc("avg(departureDelayInMinutes)")).show()

+---+---+----------------------------+
|src|dst|avg(departureDelayInMinutes)|
+---+---+----------------------------+
|SFO|MRY|           69.33333333333333|
|SFO|TUS|           68.93181818181819|
|SFO|SMF|           62.06818181818182|
|SFO|ABQ|          60.888888888888886|
|SFO|SBP|                      57.625|
|SFO|RDM|           55.61538461538461|
|SFO|SBA|          55.333333333333336|
|SFO|IND|           55.15384615384615|
|SFO|MIA|           55.03333333333333|
|SFO|PSC|                        54.0|
|SFO|ONT|          50.196969696969695|
|SFO|SLC|          48.396039603960396|
|SFO|PIT|                       48.25|
|SFO|OTH|                        48.2|
|SFO|RDD|          47.833333333333336|
|SFO|MCI|           47.57142857142857|
|SFO|SAT|           45.30769230769231|
|SFO|JFK|            44.9593220338983|
|SFO|DTW|          44.888888888888886|
|SFO|JAC|          44.083333333333336|
+---+---+----------------------------+
only showing top 20 rows



#### 3. Which destinations tend to have delays ?

In [43]:
tripDelays = tripGraph.edges.filter("departureDelayInMinutes > 0")\
.groupBy("dst_state")\
.avg("departureDelayInMinutes")

tripDelays.show(4)

+---------+----------------------------+
|dst_state|avg(departureDelayInMinutes)|
+---------+----------------------------+
|       SC|          41.598692810457514|
|       AZ|          26.147823033707866|
|       LA|           37.53012048192771|
|       MN|           43.63625038928683|
+---------+----------------------------+
only showing top 4 rows



In [44]:
df = tripDelays.toPandas()
df.head()

Unnamed: 0,dst_state,avg(departureDelayInMinutes)
0,SC,41.598693
1,AZ,26.147823
2,LA,37.53012
3,MN,43.63625
4,NJ,51.622002


In [45]:
# Refer: https://plot.ly/python/choropleth-maps/
for col in df.columns:
    df[col] = df[col].astype(str)

scl = [[0.0, 'rgb(242,240,247)'],
       [0.2, 'rgb(218,218,235)'],
       [0.4, 'rgb(188,189,220)'],
       [0.6, 'rgb(158,154,200)'],
       [0.8, 'rgb(117,107,177)'],
       [1.0, 'rgb(84,39,143)']]

data = [ dict(
        type='choropleth',
        colorscale = scl,
        autocolorscale = False,
        locations = df['dst_state'],
        z = df['avg(departureDelayInMinutes)'].astype(float),
        locationmode = 'USA-states',
        marker = dict(
            line = dict (
                color = 'rgb(255,255,255)',
                width = 2
            ) ),
        colorbar = dict(
            title = "Delay in Minutes")
        ) ]

layout = dict(
        title = 'What destinations tend to have delays by State<br>(Hover for breakdown)',
        geo = dict(
            scope='usa',
            projection=dict( type='albers usa' ),
            showlakes = True,
            lakecolor = 'rgb(255, 255, 255)'),
             )
    
fig = dict( data=data, layout=layout )
iplot( fig, filename='d3-cloropleth-map' )

#### 4.Which destinations tend to have significant delays departing from SEA ?
States with the longest cumulative delays (with individual delays > 100 minutes) (origin: Seattle)

In [46]:
tripDelaysSEA = tripGraph.edges.filter("src = 'SEA' and departureDelayInMinutes > 100")\
.groupBy("dst_state")\
.avg("departureDelayInMinutes")

tripDelaysSEA.show(4)

+---------+----------------------------+
|dst_state|avg(departureDelayInMinutes)|
+---------+----------------------------+
|       AZ|                       220.0|
|       SC|                       150.0|
|       MN|          178.66666666666666|
|       NJ|          148.85714285714286|
+---------+----------------------------+
only showing top 4 rows



In [47]:
df = tripDelaysSEA.toPandas()
df.head()

Unnamed: 0,dst_state,avg(departureDelayInMinutes)
0,AZ,220.0
1,SC,150.0
2,MN,178.666667
3,NJ,148.857143
4,OR,238.166667


In [48]:
# Refer: https://plot.ly/python/choropleth-maps/
for col in df.columns:
    df[col] = df[col].astype(str)

scl = [[0.0, 'rgb(242,240,247)'],
       [0.2, 'rgb(218,218,235)'],
       [0.4, 'rgb(188,189,220)'],            
       [0.6, 'rgb(158,154,200)'],
       [0.8, 'rgb(117,107,177)'],
       [1.0, 'rgb(84,39,143)']]

data = [ dict(
        type='choropleth',
        colorscale = scl,
        autocolorscale = False,
        locations = df['dst_state'],
        z = df['avg(departureDelayInMinutes)'].astype(float),
        locationmode = 'USA-states',
        marker = dict(
            line = dict (
                color = 'rgb(255,255,255)',
                width = 2
            ) ),
        colorbar = dict(
            title = "Delay in Minutes")
        ) ]

layout = dict(
        title = 'What destinations tend to have significant delays departing from SEA<br>(Hover for breakdown)',
        geo = dict(
            scope='usa',
            projection=dict( type='albers usa' ),
            showlakes = True,
            lakecolor = 'rgb(255, 255, 255)'),
             )
    
fig = dict( data=data, layout=layout )
iplot( fig, filename='d3-cloropleth-map' )

#### 5. Vertex Degrees
<br>**inDegrees:** Incoming connections to the airport
<br>**outDegrees:** Outgoing connections from the airport
<br>**degrees:** Total connections to and from the airport

#### Degree

In [49]:
# Degrees - The number of degrees - the number of incoming and outgoing connections - for various airports within this sample dataset
tripGraphDegrees = tripGraph.degrees.sort(desc("degree")).limit(20)
tripGraphDegrees.show()

+---+------+
| id|degree|
+---+------+
|ATL| 64887|
|ORD| 43960|
|DEN| 37329|
|LAX| 36152|
|DFW| 30989|
|PHX| 28667|
|SFO| 28175|
|LAS| 25708|
|MCO| 24483|
|IAH| 23093|
|MSP| 22427|
|DTW| 21804|
|SEA| 21429|
|BOS| 21184|
|SLC| 19953|
|EWR| 19841|
|CLT| 19483|
|FLL| 17099|
|BWI| 16739|
|JFK| 16624|
+---+------+



In [50]:
df = tripGraphDegrees.toPandas()
df.head()

Unnamed: 0,id,degree
0,ATL,64887
1,ORD,43960
2,DEN,37329
3,LAX,36152
4,DFW,30989


In [51]:
data = [go.Bar(x=df.id, y=df.degree)]
layout = go.Layout(title='Degree - Top 20 - Descending Order',)
fig = go.Figure(data=data, layout=layout)
iplot(fig, filename='basic_bar1')

#### 6. Motif Finding

City / Flight Relationships through Motif Finding
<br>To more easily understand the complex relationship of city airports and their flights with each other,
<br>we can use motifs to find patterns of airports (i.e. vertices) connected by flights (i.e. edges).
<br>The result is a DataFrame in which the column names are given by the motif keys.

<br>**What delays might we blame on SFO?**
<br>Using tripGraphPrime to more easily display
<br>- The associated edge (ab, bc) relationships
<br>- With the different the city / airports (a, b, c) where SFO is the connecting city (b)
<br>- Ensuring that flight ab (i.e. the flight to SFO) occured before flight bc (i.e. flight leaving SFO)
<br>- Note, TripID was generated based on time in the format of CCYYMMDD converted to int - Sequence Number
<br>  -- Therefore bc.tripid < ab.tripid + 10000 means the second flight (bc) occured within approx a day of the first flight (ab)

<br>**Note:** In reality, we would need to be more careful to link trips ab and bc.

In [52]:
motifs = tripGraphPrime.find("(a)-[ab]->(b); (b)-[bc]->(c)")\
  .filter("(b.id = 'SFO') and (ab.departureDelayInMinutes > 500 or bc.departureDelayInMinutes > 500) and bc.tripId > ab.tripId and bc.tripId < ab.tripId + 10000")

In [53]:
motifs.show(4, truncate = False)


+---------------------------------+---------------------------+---------------------------------------+--------------------------+----------------------------------------+
|a                                |ab                         |b                                      |bc                        |c                                       |
+---------------------------------+---------------------------+---------------------------------------+--------------------------+----------------------------------------+
|[BUR, Burbank, CA, United States]|[12532, 687, 326, BUR, SFO]|[SFO, San Francisco, CA, United States]|[12597, -5, 599, SFO, SLC]|[SLC, Salt Lake City, UT, United States]|
|[BUR, Burbank, CA, United States]|[12532, 687, 326, BUR, SFO]|[SFO, San Francisco, CA, United States]|[12601, -2, 326, SFO, BUR]|[BUR, Burbank, CA, United States]       |
|[BUR, Burbank, CA, United States]|[12532, 687, 326, BUR, SFO]|[SFO, San Francisco, CA, United States]|[12643, 11, 447, SFO, SAN]|[SAN, San 

#### 7. PageRank

#### Determining Airport Ranking using PageRank
There are a large number of flights and connections through these various airports included in this Departure Delay Dataset.
<br>Using the pageRank algorithm, Spark iteratively traverses the graph and determines a rough estimate of how important the airport is.

In [54]:
# Determining Airport ranking of importance using `pageRank`
ranks = tripGraph.pageRank(maxIter=5)
ranksDisplay = ranks.vertices.orderBy(ranks.vertices.pagerank.desc()).limit(20)
ranksDisplay.show()

+---+-----------------+-----+-------------+------------------+
| id|             city|state|      country|          pagerank|
+---+-----------------+-----+-------------+------------------+
|ATL|          Atlanta|   GA|United States|20.177171960860903|
|ORD|          Chicago|   IL|United States|13.157018642326195|
|DEN|           Denver|   CO|United States|10.342668730780478|
|DFW|Dallas-Fort Worth|   TX|United States| 8.964923534530998|
|LAX|      Los Angeles|   CA|United States| 8.455103072395062|
|PHX|          Phoenix|   AZ|United States| 7.423372726662571|
|MSP|      Minneapolis|   MN|United States| 7.297660788911538|
|SFO|    San Francisco|   CA|United States| 7.213926446645566|
|DTW|          Detroit|   MI|United States| 6.976776841528112|
|IAH|          Houston|   TX|United States| 6.865025182388982|
|SLC|   Salt Lake City|   UT|United States| 6.530381123302239|
|SEA|          Seattle|   WA|United States|6.2584847077915295|
|MCO|          Orlando|   FL|United States| 6.013041841

In [55]:
df = ranksDisplay.toPandas()
df["id_city_state"] = df["id"].map(str) + ", " + df["city"] + ", " + df["state"]
df.head()

Unnamed: 0,id,city,state,country,pagerank,id_city_state
0,ATL,Atlanta,GA,United States,20.177172,"ATL, Atlanta, GA"
1,ORD,Chicago,IL,United States,13.157019,"ORD, Chicago, IL"
2,DEN,Denver,CO,United States,10.342669,"DEN, Denver, CO"
3,DFW,Dallas-Fort Worth,TX,United States,8.964924,"DFW, Dallas-Fort Worth, TX"
4,LAX,Los Angeles,CA,United States,8.455103,"LAX, Los Angeles, CA"


In [56]:
data = [go.Bar(x=df.id_city_state, y=df.pagerank)]
layout = go.Layout(title='PageRank - Top 20 - Descending Order',)
fig = go.Figure(data=data, layout=layout)
iplot(fig, filename='basic_bar2')

#### 8.Single Hops
Most popular flights (single city hops)
<br>Using the tripGraph, we can quickly determine what are the most popular single city hop flights

In [57]:
import pyspark.sql.functions as func

topTrips = tripGraph \
  .edges \
  .groupBy("src", "dst") \
  .agg(func.count("*").alias("trips"))
  
topTrips20 = topTrips.orderBy(topTrips.trips.desc()).limit(20)

topTrips20.show(20)

+---+---+-----+
|src|dst|trips|
+---+---+-----+
|SFO|LAX| 1381|
|LAX|SFO| 1361|
|JFK|LAX| 1103|
|LAX|JFK| 1102|
|LAS|LAX|  972|
|LAX|LAS|  967|
|HNL|OGG|  859|
|OGG|HNL|  859|
|SEA|LAX|  817|
|LAX|SEA|  816|
|ATL|MCO|  796|
|MCO|ATL|  796|
|LGA|ORD|  772|
|PHX|DEN|  762|
|ORD|LGA|  758|
|DEN|PHX|  754|
|ATL|LGA|  753|
|LGA|ATL|  753|
|BOS|DCA|  738|
|DCA|BOS|  734|
+---+---+-----+



In [58]:
df = topTrips20.toPandas()
df["src_dst"] = df["src"].map(str) + "/" +df["dst"]
df.head()

Unnamed: 0,src,dst,trips,src_dst
0,SFO,LAX,1381,SFO/LAX
1,LAX,SFO,1361,LAX/SFO
2,JFK,LAX,1103,JFK/LAX
3,LAX,JFK,1102,LAX/JFK
4,LAS,LAX,972,LAS/LAX


In [59]:
data = [go.Bar(x=df.src_dst, y=df.trips)]
layout = go.Layout(title='Top20 - Single HOPS',)
fig = go.Figure(data=data, layout=layout)
iplot(fig, filename='basic_bar3')

#### 9.Transfer Cities
Top Transfer Cities
<br>Many airports are used as transfer points instead of the final Destination.
<br>An easy way to calculate this is by calculating the ratio of
<br>inDegree (the number of flights to the airport) / outDegree (the number of flights leaving the airport).
<br>Values close to 1 may indicate many transfers, whereas
<br>values < 1 indicate many outgoing flights and
<br>values > 1 indicate many incoming flights.

<br>**Note:** this is a simple calculation that does not take into account of timing or scheduling of flights,
<br>just the overall aggregate number within the dataset.

In [60]:
# Calculate the inDeg (flights into the airport) and outDeg (flights leaving the airport)
inDeg = tripGraph.inDegrees
outDeg = tripGraph.outDegrees

In [61]:
# Calculate the degreeRatio (inDeg/outDeg)
degreeRatio = inDeg.join(outDeg, inDeg.id == outDeg.id) \
  .drop(outDeg.id) \
  .selectExpr("id", "double(inDegree)/double(outDegree) as degreeRatio") \
  .cache()

In [62]:
degreeRatio.show()

+---+------------------+
| id|       degreeRatio|
+---+------------------+
|BGM|               1.0|
|PSE|               1.0|
|INL|1.0188679245283019|
|MSY|1.0002501876407306|
|PPG|               1.0|
|GEG|0.9975155279503105|
|BUR|               1.0|
|SNA|1.0002941176470588|
|GRB|               1.0|
|GTF|0.9924242424242424|
|IDA|               1.0|
|GRR|0.9970674486803519|
|EUG|0.9964028776978417|
|PSG|               1.0|
|PVD|1.0008510638297872|
|GSO| 0.997946611909651|
|MYR|               1.0|
|OAK|0.9982762866289091|
|MQT|               1.0|
|FSM|               1.0|
+---+------------------+
only showing top 20 rows



In [63]:
# Join back to the `airports` DataFrame (instead of registering temp table as above)
nonTransferAirports = degreeRatio.join(airportsNADF, degreeRatio.id == airportsNADF.IATA) \
  .selectExpr("id", "city", "degreeRatio") \
  .filter("degreeRatio < .9 or degreeRatio > 1.1")

In [64]:
# List out the city airports which have abnormal degree ratios.
nonTransferAirports.show()

+---+----------+------------------+
| id|      city|       degreeRatio|
+---+----------+------------------+
|TWF|Twin Falls|0.8165137614678899|
+---+----------+------------------+



In [65]:
# Join back to the `airports` DataFrame (instead of registering temp table as above)
transferAirports = degreeRatio.join(airportsNADF, degreeRatio.id == airportsNADF.IATA) \
  .selectExpr("id", "city", "degreeRatio") \
  .filter("degreeRatio between 0.9 and 1.1")

In [66]:
# List out the top 20 transfer city airports
transferAirportsDF = transferAirports.orderBy("degreeRatio").limit(20)
transferAirportsDF.show()

+---+------------------+------------------+
| id|              city|       degreeRatio|
+---+------------------+------------------+
|KOA|              Kona|0.9718189581554227|
|MOT|             Minot|0.9772727272727273|
|HIB|           Hibbing|0.9818181818181818|
|PGD|       Punta Gorda|0.9827586206896551|
|PBG|       Plattsburgh|0.9838709677419355|
|BLI|        Bellingham|0.9871794871794872|
|GCC|          Gillette|0.9888888888888889|
|ACV|         Arcata CA| 0.989247311827957|
|SPI|       Springfield|0.9896907216494846|
|BIS|          Bismarck|0.9914529914529915|
|GTF|       Great Falls|0.9924242424242424|
|CID|      Cedar Rapids|0.9927272727272727|
|SGF|       Springfield|0.9936305732484076|
|TLH|       Tallahassee|0.9946236559139785|
|CHO|Charlottesville VA|0.9949748743718593|
|MBS|           Saginaw|0.9952830188679245|
|TUL|             Tulsa|0.9956597222222222|
|GJT|    Grand Junction|0.9957446808510638|
|BTV|        Burlington|0.9962264150943396|
|EUG|            Eugene|0.996402

In [67]:
df = transferAirportsDF.toPandas()
df["city_id"] = df["city"].map(str) + ", " +df["id"]
df.head()

Unnamed: 0,id,city,degreeRatio,city_id
0,KOA,Kona,0.971819,"Kona, KOA"
1,MOT,Minot,0.977273,"Minot, MOT"
2,HIB,Hibbing,0.981818,"Hibbing, HIB"
3,PGD,Punta Gorda,0.982759,"Punta Gorda, PGD"
4,PBG,Plattsburgh,0.983871,"Plattsburgh, PBG"


In [68]:
data = [go.Bar(x=df.city_id, y=df.degreeRatio)]
layout = go.Layout(title='Top20 - Transfer Cities',)
fig = go.Figure(data=data, layout=layout)
iplot(fig, filename='basic_bar4')

#### 10.Breadth First Search
Breadth-first search (BFS) is designed to traverse the graph to quickly find the desired vertices 
<br>(i.e. airports) and edges (i.e flights).
<br>Let's try to find the shortest number of connections between cities based on the dataset.
<br>**Note:** These examples do not take into account of time or distance, just hops between cities.

#### Example 1: Direct Seattle to San Francisco

In [69]:
filteredPaths1 = tripGraph.bfs(
  fromExpr = "id = 'SEA'",
  toExpr = "id = 'SFO'",
  maxPathLength = 1)

filteredPaths1.show()

+--------------------+--------------------+--------------------+
|                from|                  e0|                  to|
+--------------------+--------------------+--------------------+
|[SEA, Seattle, WA...|[594, -7, 679, SE...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[1877, -2, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[2084, -5, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[2458, -4, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[3757, -8, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[4461, -4, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[4502, -2, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[5763, -7, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[6134, -5, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[7491, -8, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[7801, -9, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[7823, -5, 679, S...|[SFO, San Francis...|
|[SEA, Seattle, WA...|[95

As you can see, there are a number of direct flights between Seattle and San Francisco.

#### Example 2: Direct San Francisco and Buffalo

In [70]:
filteredPaths2 = tripGraph.bfs(
  fromExpr = "id = 'SFO'",
  toExpr = "id = 'BUF'",
  maxPathLength = 1)

filteredPaths2.show()

+---+----+-----+-------+
| id|city|state|country|
+---+----+-----+-------+
+---+----+-----+-------+



As you can see, there are no direct flights between San Francisco and Buffalo.

#### Example 3: Flying from San Francisco to Buffalo

In [71]:
filteredPaths3 = tripGraph.bfs(
  fromExpr = "id = 'SFO'",
  toExpr = "id = 'BUF'",
  maxPathLength = 2)

filteredPaths3.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                from|                  e0|                  v1|                  e1|                  to|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[SFO, San Francis...|[1691, -3, 2704, ...|[BOS, Boston, MA,...|[1236, -10, 395, ...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1691, -3, 2704, ...|[BOS, Boston, MA,...|[5433, 34, 395, B...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1691, -3, 2704, ...|[BOS, Boston, MA,...|[10585, 0, 395, B...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1691, -3, 2704, ...|[BOS, Boston, MA,...|[14210, -3, 395, ...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1691, -3, 2704, ...|[BOS, Boston, MA,...|[16789, -4, 395, ...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1691, -3, 2704, ...|[BOS, Boston, MA,...|[20355, -6, 395, ...|[BUF, Buffalo, NY...|
|[SFO, San Francis...|[1691, -3, 2704

As you can see, there are flights from San Francisco to Buffalo with Washington as the transfer point.