### Introduction

The [Bureau of Transportation Statistics](https://www.transtats.bts.gov) regularly publishes data about domestic flights. Information such as flight departure and arrival airports, carrier information, flight times, delays, cancellations, etc. is available for free and can be analyzed by anyone with an interest.

There is a lot of interesting information on the [Bureau of Transportation Statistics](https://www.transtats.bts.gov) website, but the data in this analysis can be obtained [here](https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236). You will have to download data one month at a time as far as I can tell.

### Setup

In [1]:
import sys

from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from graphframes import GraphFrame
from graphframes.examples import Graphs

# Set the Spark checkpoint directory
sc.setCheckpointDir('/tmp')

In [2]:
print(sys.version)

3.6.8 (default, Aug  2 2019, 17:42:44) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)]


In [3]:
spark

In [4]:
# Data Locations
CARRIER_CODE_LOC = 's3://jornaya-ds-us-east-1-sandbox/csnyder/L_UNIQUE_CARRIERS.csv'
AIRPORT_CODE_LOC = 's3://jornaya-ds-us-east-1-sandbox/csnyder/L_AIRPORT.csv'
FLIGHT_DATA_LOC = 's3://jornaya-ds-us-east-1-sandbox/csnyder/dot_carrier_data'

CARRIER_CODE = 'carrier_code'
AIRPORT_CODE = 'airport_code'

### Data Import and Cleaning

We will be taking a look at flight data for all domestic flights from January 2013 to July 2019. The raw data contains both airline and airport codes, and so additional lookup tables are imported to make results human-readable.

#### Carrier Lookup Table

In [5]:
# Read in airline codes mapping
carrier_codes = spark.read.csv(CARRIER_CODE_LOC, header=True) \
    .selectExpr('Code as {}'.format(CARRIER_CODE), 'Description as carrier_name')

In [6]:
carrier_codes.limit(5).toPandas()

Unnamed: 0,carrier_code,carrier_name
0,02Q,Titan Airways
1,04Q,Tradewind Aviation
2,05Q,"Comlux Aviation, AG"
3,06Q,Master Top Linhas Aereas Ltd.
4,07Q,Flair Airlines Ltd.


#### Airport Lookup Table

In [7]:
# Read in airport codes mapping
airport_codes = spark.read.csv(AIRPORT_CODE_LOC, header=True) \
    .withColumn('darray', F.split(F.col('Description'), ': ')) \
    .select(
        'Code',
        F.col('darray')[0].alias('airport_city'),
        F.col('darray')[1].alias('airport_name')
    ) \
    .withColumnRenamed('Code', AIRPORT_CODE)

In [8]:
airport_codes.limit(5).toPandas()

Unnamed: 0,airport_code,airport_city,airport_name
0,01A,"Afognak Lake, AK",Afognak Lake Airport
1,03A,"Granite Mountain, AK",Bear Creek Mining Strip
2,04A,"Lik, AK",Lik Mining Camp
3,05A,"Little Squaw, AK",Little Squaw Airport
4,06A,"Kizhuyak, AK",Kizhuyak Bay


#### Flight Data

In [9]:
# Read in the airline linkage data, select only subset of columns, join airline codes
air = spark.read.csv(FLIGHT_DATA_LOC, header=True) \
    .select('FL_DATE', 'OP_UNIQUE_CARRIER', 'ORIGIN', 'ORIGIN_STATE_ABR', 'DEST', 'DEST_STATE_ABR',
            'CANCELLED', 'CARRIER_DELAY', 'WEATHER_DELAY', 'year') \
    .withColumnRenamed('OP_UNIQUE_CARRIER', CARRIER_CODE) \
    .join(F.broadcast(carrier_codes), on=CARRIER_CODE, how='inner')

In [10]:
# delays are in minutes and are null when there is no delay. Fill nulls as 0.
air = air.na.fill({'CARRIER_DELAY': 0, 'WEATHER_DELAY': 0})

In [11]:
air.count()

40796427

In [12]:
air.limit(10).toPandas()

Unnamed: 0,carrier_code,FL_DATE,ORIGIN,ORIGIN_STATE_ABR,DEST,DEST_STATE_ABR,CANCELLED,CARRIER_DELAY,WEATHER_DELAY,year,carrier_name
0,AA,2016-10-01,JFK,NY,LAX,CA,0.0,0.0,0.0,2016,American Airlines Inc.
1,AA,2016-10-01,LAX,CA,JFK,NY,0.0,0.0,0.0,2016,American Airlines Inc.
2,AA,2016-10-01,JFK,NY,LAX,CA,0.0,0.0,0.0,2016,American Airlines Inc.
3,AA,2016-10-01,DFW,TX,HNL,HI,0.0,42.0,0.0,2016,American Airlines Inc.
4,AA,2016-10-01,OKC,OK,DFW,TX,0.0,0.0,0.0,2016,American Airlines Inc.
5,AA,2016-10-01,OGG,HI,DFW,TX,0.0,0.0,0.0,2016,American Airlines Inc.
6,AA,2016-10-01,DFW,TX,OGG,HI,0.0,0.0,0.0,2016,American Airlines Inc.
7,AA,2016-10-01,HNL,HI,DFW,TX,0.0,0.0,0.0,2016,American Airlines Inc.
8,AA,2016-10-01,JFK,NY,SFO,CA,0.0,0.0,0.0,2016,American Airlines Inc.
9,AA,2016-10-01,LAX,CA,JFK,NY,0.0,0.0,0.0,2016,American Airlines Inc.


### Data Summaries

What are the carriers that conducted the most flights?

In [13]:
carrier_flight_counts = air \
    .groupBy(CARRIER_CODE, 'carrier_name') \
    .count()

In [14]:
carrier_flight_counts \
    .orderBy(F.desc('count')) \
    .limit(10).toPandas()

Unnamed: 0,carrier_code,carrier_name,count
0,WN,Southwest Airlines Co.,8348262
1,DL,Delta Air Lines Inc.,5793891
2,AA,American Airlines Inc.,5075433
3,OO,SkyWest Airlines Inc.,4394430
4,UA,United Air Lines Inc.,3624232
5,EV,ExpressJet Airlines LLC,3119077
6,B6,JetBlue Airways,1817011
7,MQ,Envoy Air,1612031
8,AS,Alaska Airlines Inc.,1248357
9,US,US Airways Inc.,1025753


What are the carriers that conducted the least flights?

In [15]:
carrier_flight_counts \
    .limit(10).toPandas()

Unnamed: 0,carrier_code,carrier_name,count
0,FL,AirTran Airways Corporation,253447
1,F9,Frontier Airlines Inc.,643451
2,AS,Alaska Airlines Inc.,1248357
3,YX,Republic Airline,503932
4,UA,United Air Lines Inc.,3624232
5,MQ,Envoy Air,1612031
6,G4,Allegiant Air,161696
7,AA,American Airlines Inc.,5075433
8,B6,JetBlue Airways,1817011
9,YV,Mesa Airlines Inc.,487572


What carriers had the most highest flight delays (avg minutes) in 2019?

In [16]:
air \
    .filter('year = 2019') \
    .groupBy('carrier_name') \
    .agg(
        F.sum('CARRIER_DELAY').alias('total_minutes_delayed'),
        F.count('*').alias('flight_cnt'),
        F.avg(F.col('CARRIER_DELAY')).alias('avg_delay'),
        F.stddev(F.col('CARRIER_DELAY')).alias('std_delay')
    ) \
    .orderBy(F.desc('avg_delay')) \
    .limit(10).toPandas()

Unnamed: 0,carrier_name,total_minutes_delayed,flight_cnt,avg_delay,std_delay
0,JetBlue Airways,1225544.0,172933,7.086814,34.051539
1,ExpressJet Airlines LLC,540128.0,79142,6.824796,45.065704
2,SkyWest Airlines Inc.,2833063.0,481377,5.885331,45.766722
3,Mesa Airlines Inc.,702506.0,131512,5.341763,36.72102
4,American Airlines Inc.,2722610.0,548425,4.964416,33.32184
5,Allegiant Air,323610.0,65475,4.942497,37.037561
6,Frontier Airlines Inc.,350478.0,73626,4.760248,22.532748
7,Endeavor Air Inc.,579008.0,147178,3.934066,28.76362
8,Delta Air Lines Inc.,2200978.0,571071,3.854123,30.858868
9,United Air Lines Inc.,1335354.0,361121,3.697802,28.520798


### Creating the GraphFrame Object

Now we create the vertex and edge DataFrames. A GraphFrame object needs both DataFrames in order to be instantiated.

In [17]:
# Edges
e = air.selectExpr('ORIGIN as src', 'DEST as dst', CARRIER_CODE, 'year') \
    .filter('src is not null') \
    .filter('dst is not null')
# Vertices
v0 = e.selectExpr('src as id').union(e.selectExpr('dst as id')).distinct()
v = v0 \
    .join(F.broadcast(airport_codes), v0.id==airport_codes[AIRPORT_CODE], how='inner')

All DataFrame objects in SparkSQL have a query plan, much like in a traditional SQL database. You can inspect this query plan for a DataFrame object by invoking the `explain` method.

In [18]:
e.explain()

== Physical Plan ==
*(2) Project [ORIGIN#67 AS src#318, DEST#76 AS dst#319, carrier_code#137, year#92]
+- *(2) BroadcastHashJoin [carrier_code#137], [carrier_code#14], Inner, BuildRight
   :- *(2) Project [OP_UNIQUE_CARRIER#60 AS carrier_code#137, ORIGIN#67, DEST#76, year#92]
   :  +- *(2) Filter ((isnotnull(ORIGIN#67) && isnotnull(DEST#76)) && isnotnull(OP_UNIQUE_CARRIER#60))
   :     +- *(2) FileScan csv [OP_UNIQUE_CARRIER#60,ORIGIN#67,DEST#76,year#92] Batched: false, Format: CSV, Location: InMemoryFileIndex[s3://jornaya-ds-us-east-1-sandbox/csnyder/dot_carrier_data], PartitionCount: 7, PartitionFilters: [], PushedFilters: [IsNotNull(ORIGIN), IsNotNull(DEST), IsNotNull(OP_UNIQUE_CARRIER)], ReadSchema: struct<OP_UNIQUE_CARRIER:string,ORIGIN:string,DEST:string>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *(1) Project [Code#10 AS carrier_code#14]
         +- *(1) Filter isnotnull(Code#10)
            +- *(1) FileScan csv [Code#10] Batched: 

Checkpointing in Spark is way to truncate the query plan by triggering Spark to cache the data to disk. This can be very useful when you know you're going to be using a DataFrame more than once.

In [19]:
e = e.checkpoint()
v = v.checkpoint()

In [20]:
v.show()

+---+------------+--------------------+--------------------+
| id|airport_code|        airport_city|        airport_name|
+---+------------+--------------------+--------------------+
|INL|         INL|International Fal...|Falls Internation...|
|BUR|         BUR|         Burbank, CA|            Bob Hope|
|IDA|         IDA|     Idaho Falls, ID|Idaho Falls Regional|
|EUG|         EUG|          Eugene, OR|  Mahlon Sweet Field|
|PVD|         PVD|      Providence, RI|Theodore Francis ...|
|EAR|         EAR|         Kearney, NE|    Kearney Regional|
|DCA|         DCA|      Washington, DC|Ronald Reagan Was...|
|RDM|         RDM|    Bend/Redmond, OR|       Roberts Field|
|EVV|         EVV|      Evansville, IN| Evansville Regional|
|CWA|         CWA|         Mosinee, WI|   Central Wisconsin|
|TYR|         TYR|           Tyler, TX|Tyler Pounds Regi...|
|DIK|         DIK|       Dickinson, ND|Dickinson - Theod...|
|GST|         GST|        Gustavus, AK|    Gustavus Airport|
|TOL|         TOL|      

In [21]:
e.explain()

== Physical Plan ==
Scan ExistingRDD[src#318,dst#319,carrier_code#137,year#92]


Create the GraphFrame object.

In [22]:
g = GraphFrame(v, e)

Once instantiated, you can access the underlying vertex and edge DataFrames easily:

In [23]:
g.vertices.limit(5).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name
0,INL,INL,"International Falls, MN",Falls International Einarson Field
1,BUR,BUR,"Burbank, CA",Bob Hope
2,IDA,IDA,"Idaho Falls, ID",Idaho Falls Regional
3,EUG,EUG,"Eugene, OR",Mahlon Sweet Field
4,PVD,PVD,"Providence, RI",Theodore Francis Green State


In [24]:
g.edges.limit(5).toPandas()

Unnamed: 0,src,dst,carrier_code,year
0,JFK,LAX,AA,2016
1,LAX,JFK,AA,2016
2,JFK,LAX,AA,2016
3,DFW,HNL,AA,2016
4,OKC,DFW,AA,2016


The way we've set up our graph data:
* vertices represent airports
* edges represent flights between airports

Note also that we can have an arbitrary set of additional columns in both the vertex and edge DataFrames.

### InDegrees/OutDegrees

Each edge in our graph has a direction - a flight (edge) departs from one airport (node) and arrives at another airport (node). A simple way of summarizing this edge directionality is by examining in-degrees and out-degrees of each node.

In [25]:
%%time
g.inDegrees \
    .join(F.broadcast(airport_codes), g.inDegrees.id==airport_codes[AIRPORT_CODE], how='inner') \
    .select('airport_name', 'id', 'inDegree') \
    .orderBy(F.desc('inDegree')) \
    .limit(5).toPandas()

CPU times: user 10.5 ms, sys: 343 µs, total: 10.9 ms
Wall time: 3.3 s


Unnamed: 0,airport_name,id,inDegree
0,Hartsfield-Jackson Atlanta International,ATL,2516381
1,Chicago O'Hare International,ORD,1943630
2,Dallas/Fort Worth International,DFW,1655584
3,Denver International,DEN,1491885
4,Los Angeles International,LAX,1435820


In [26]:
%%time
g.inDegrees \
    .join(F.broadcast(airport_codes), g.inDegrees.id==airport_codes[AIRPORT_CODE], how='inner') \
    .select('airport_name', 'id', 'inDegree') \
    .orderBy('inDegree') \
    .limit(5).toPandas()

CPU times: user 11.8 ms, sys: 1.61 ms, total: 13.4 ms
Wall time: 2.58 s


Unnamed: 0,airport_name,id,inDegree
0,Northern Colorado Regional,FNL,1
1,Youngstown-Warren Regional,YNG,2
2,Mobile Downtown,BFM,66
3,Florence Regional,FLO,170
4,Owensboro Daviess County Regional,OWB,182


In [27]:
%%time
g.outDegrees \
    .join(F.broadcast(airport_codes), g.outDegrees.id==airport_codes[AIRPORT_CODE], how='inner') \
    .select('airport_name', 'id', 'outDegree') \
    .orderBy(F.desc('outDegree')) \
    .limit(5).toPandas()

CPU times: user 9.17 ms, sys: 200 µs, total: 9.37 ms
Wall time: 2.37 s


Unnamed: 0,airport_name,id,outDegree
0,Hartsfield-Jackson Atlanta International,ATL,2516428
1,Chicago O'Hare International,ORD,1943471
2,Dallas/Fort Worth International,DFW,1655579
3,Denver International,DEN,1491960
4,Los Angeles International,LAX,1435746


In [28]:
%%time
g.outDegrees \
    .join(F.broadcast(airport_codes), g.outDegrees.id==airport_codes[AIRPORT_CODE], how='inner') \
    .select('airport_name', 'id', 'outDegree') \
    .orderBy('outDegree') \
    .limit(5).toPandas()

CPU times: user 9.91 ms, sys: 354 µs, total: 10.3 ms
Wall time: 2.12 s


Unnamed: 0,airport_name,id,outDegree
0,Wendover Airport,ENV,1
1,Tokeen Airport,TKI,1
2,Ellington,EFD,1
3,Middle Georgia Regional,MCN,2
4,Youngstown-Warren Regional,YNG,2


### Motif-Finding

Motifs are simply structural patterns in a graph. GraphFrame motif finding uses a simple Domain-Specific Language (DSL) for expressing structural queries.

For example, `graph.find("(a)-[e]->(b); (b)-[e2]->(a)")` will search for pairs of vertices `a`,`b` connected by edges in both directions.

See the GraphFrames [user guide](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding) for more details on the DSL.

In [29]:
%%time
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
motifs.limit(10).toPandas()

CPU times: user 8.69 ms, sys: 588 µs, total: 9.28 ms
Wall time: 9.83 s


Unnamed: 0,a,e,b,e2
0,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2016)"
1,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2016)"
2,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2016)"
3,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2015)"
4,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2015)"
5,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2015)"
6,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2015)"
7,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2013)"
8,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2014)"
9,"(SJU, SJU, San Juan, PR, Luis Munoz Marin Inte...","(SJU, CLE, UA, 2016)","(CLE, CLE, Cleveland, OH, Cleveland-Hopkins In...","(CLE, SJU, UA, 2014)"


### SubGraphs

It is very easy to look create and examine subgraphs using GraphFrames. Since we're working with DataFrames, it's as easy as applying record filters to our data.

Our data has flight data for all carriers who conducted domestic flights. Suppose we were only interested in examining the flight data of American Airlines.

In [30]:
gAA = g.filterEdges("{} = 'AA'".format(CARRIER_CODE))

What are the most common airports that American Airlines flights fly into?

In [31]:
%%time
gAA_inD_all = gAA.inDegrees \
    .join(F.broadcast(airport_codes), gAA.inDegrees.id==airport_codes[AIRPORT_CODE], how='inner') \
    .select('airport_name', 'id', 'inDegree')

inflightsum = gAA_inD_all.groupBy().sum('InDegree').collect()[0][0]

gAA_inD_all.withColumn('pct', F.col('inDegree') / inflightsum) \
    .orderBy(F.desc('inDegree')) \
    .limit(5).toPandas()

CPU times: user 15 ms, sys: 922 µs, total: 16 ms
Wall time: 5.73 s


Unnamed: 0,airport_name,id,inDegree,pct
0,Dallas/Fort Worth International,DFW,970394,0.191194
1,Charlotte Douglas International,CLT,401740,0.079154
2,Chicago O'Hare International,ORD,383872,0.075633
3,Miami International,MIA,334461,0.065898
4,Phoenix Sky Harbor International,PHX,249304,0.04912


What are the most common airports that American Airlines flights fly into in 2019?

In [32]:
%%time
gAA_2019 = gAA.filterEdges("year = '2019'")

gAA_inD_2019 = gAA_2019.inDegrees \
    .join(F.broadcast(airport_codes), gAA_2019.inDegrees.id==airport_codes[AIRPORT_CODE], how='inner') \
    .select('airport_name', 'id', 'inDegree')

inflightsum_2019 = gAA_inD_2019.groupBy().sum('InDegree').collect()[0][0]

gAA_inD_2019.withColumn('pct', F.col('inDegree') / inflightsum_2019) \
    .orderBy(F.desc('inDegree')) \
    .limit(5).toPandas()

CPU times: user 20.9 ms, sys: 65 µs, total: 21 ms
Wall time: 5.92 s


Unnamed: 0,airport_name,id,inDegree,pct
0,Dallas/Fort Worth International,DFW,88246,0.160908
1,Charlotte Douglas International,CLT,57245,0.104381
2,Chicago O'Hare International,ORD,38027,0.069339
3,Phoenix Sky Harbor International,PHX,33727,0.061498
4,Miami International,MIA,29660,0.054082


### Algorithm Tour

#### Breadth-First Search (BFS)

In [33]:
%%time
g_bfs = g.filterEdges('year = 2019') \
    .bfs('id="PHL"', 'id="HNL"', maxPathLength=2)
g_bfs = g_bfs.checkpoint()

CPU times: user 7.36 ms, sys: 386 µs, total: 7.75 ms
Wall time: 30.4 s


In [34]:
g_bfsd = g_bfs.distinct()
g_bfsd = g_bfsd.checkpoint()

In [35]:
g_bfsd.drop('v1').limit(10).toPandas()

Unnamed: 0,from,e0,e1,to
0,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, ORD, EV, 2019)","(ORD, HNL, AA, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
1,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, SEA, AA, 2019)","(SEA, HNL, DL, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
2,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, LAX, AA, 2019)","(LAX, HNL, AA, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
3,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, LAX, NK, 2019)","(LAX, HNL, AS, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
4,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, DTW, OH, 2019)","(DTW, HNL, DL, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
5,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, DEN, AA, 2019)","(DEN, HNL, UA, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
6,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, DEN, F9, 2019)","(DEN, HNL, UA, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
7,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, ATL, AA, 2019)","(ATL, HNL, DL, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
8,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, ATL, F9, 2019)","(ATL, HNL, DL, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
9,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, IAH, YV, 2019)","(IAH, HNL, UA, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."


In [36]:
g_bfsd.drop('v1') \
    .filter(g_bfsd.e0[CARRIER_CODE] == 'AA') \
    .filter(g_bfsd.e1[CARRIER_CODE] == 'AA') \
    .limit(10).toPandas()

Unnamed: 0,from,e0,e1,to
0,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, LAX, AA, 2019)","(LAX, HNL, AA, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
1,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, DFW, AA, 2019)","(DFW, HNL, AA, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
2,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, ORD, AA, 2019)","(ORD, HNL, AA, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."
3,"(PHL, PHL, Philadelphia, PA, Philadelphia Inte...","(PHL, PHX, AA, 2019)","(PHX, HNL, AA, 2019)","(HNL, HNL, Honolulu, HI, Daniel K Inouye Inter..."


In [37]:
g_bfsd.count()

93

#### Connected Components

Let's run connected components on the entire dataset and see what happens.

In [38]:
%%time
g_cc = g.connectedComponents()
g_cc = g_cc.checkpoint()

CPU times: user 8.73 ms, sys: 0 ns, total: 8.73 ms
Wall time: 43.9 s


In [39]:
g_cc.printSchema()

root
 |-- id: string (nullable = true)
 |-- airport_code: string (nullable = true)
 |-- airport_city: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- component: long (nullable = true)



In [40]:
g_cc.groupBy('component').count().orderBy(F.desc('component')).limit(10).toPandas()

Unnamed: 0,component,count
0,8589934592,378


Not that interesting, but also expected. You would think that you can get to any airport from any airport in the US. Let's create a subgraph using just Virgin America (VX) flight data.

In [41]:
%%time
gVX = g.filterEdges('{} = "VX"'.format(CARRIER_CODE))
gVX_cc = gVX.connectedComponents()
gVX_cc = gVX_cc.checkpoint()

CPU times: user 6.19 ms, sys: 780 µs, total: 6.97 ms
Wall time: 24.9 s


In [42]:
gVX_cc.limit(5).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,component
0,INL,INL,"International Falls, MN",Falls International Einarson Field,8589934592
1,BUR,BUR,"Burbank, CA",Bob Hope,34359738368
2,IDA,IDA,"Idaho Falls, ID",Idaho Falls Regional,60129542144
3,EUG,EUG,"Eugene, OR",Mahlon Sweet Field,85899345920
4,PVD,PVD,"Providence, RI",Theodore Francis Green State,103079215104


In [43]:
gVX_cc.groupBy('component').count().orderBy(F.desc('count')).limit(5).toPandas()

Unnamed: 0,component,count
0,171798691840,34
1,4810363371520,1
2,4260607557633,1
3,3324304687104,1
4,3444563771392,1


In [44]:
gVX_cc.filter('component = 171798691840') \
    .toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,component
0,DCA,DCA,"Washington, DC",Ronald Reagan Washington National,171798691840
1,AUS,AUS,"Austin, TX",Austin - Bergstrom International,171798691840
2,LAS,LAS,"Las Vegas, NV",McCarran International,171798691840
3,DEN,DEN,"Denver, CO",Denver International,171798691840
4,ONT,ONT,"Ontario, CA",Ontario International,171798691840
5,LAX,LAX,"Los Angeles, CA",Los Angeles International,171798691840
6,ANC,ANC,"Anchorage, AK",Ted Stevens Anchorage International,171798691840
7,JFK,JFK,"New York, NY",John F. Kennedy International,171798691840
8,SLC,SLC,"Salt Lake City, UT",Salt Lake City International,171798691840
9,MSY,MSY,"New Orleans, LA",Louis Armstrong New Orleans International,171798691840


#### Label Propagation Algorithm (LPA)

In [45]:
g_lpa = g.labelPropagation(maxIter=5)
g_lpa = g_lpa.checkpoint()

In [46]:
g_lpa.printSchema()

root
 |-- id: string (nullable = true)
 |-- airport_code: string (nullable = true)
 |-- airport_city: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- label: long (nullable = true)



In [47]:
g_lpa.limit(5).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,label
0,MSY,MSY,"New Orleans, LA",Louis Armstrong New Orleans International,4655744548864
1,OTZ,OTZ,"Kotzebue, AK",Ralph Wien Memorial,1589137899521
2,FLG,FLG,"Flagstaff, AZ",Flagstaff Pulliam,4655744548864
3,HHH,HHH,"Hilton Head, SC",Hilton Head Airport,4655744548864
4,ABI,ABI,"Abilene, TX",Abilene Regional,4655744548864


In [48]:
g_lpa.groupBy('label').count().orderBy(F.desc('count')).limit(5).toPandas()

Unnamed: 0,label,count
0,4655744548864,327
1,1589137899521,39
2,5583457484800,6
3,3075196583937,3
4,5102421147648,2


In [49]:
g_lpa.filter('label = 4655744548864') \
    .limit(10).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,label
0,MSY,MSY,"New Orleans, LA",Louis Armstrong New Orleans International,4655744548864
1,FLG,FLG,"Flagstaff, AZ",Flagstaff Pulliam,4655744548864
2,HHH,HHH,"Hilton Head, SC",Hilton Head Airport,4655744548864
3,ABI,ABI,"Abilene, TX",Abilene Regional,4655744548864
4,GRB,GRB,"Green Bay, WI",Green Bay Austin Straubel International,4655744548864
5,TXK,TXK,"Texarkana, AR",Texarkana Regional-Webb Field,4655744548864
6,LRD,LRD,"Laredo, TX",Laredo International,4655744548864
7,HRL,HRL,"Harlingen/San Benito, TX",Valley International,4655744548864
8,ILG,ILG,"Wilmington, DE",New Castle,4655744548864
9,HYA,HYA,"Hyannis, MA",Barnstable Municipal-Boardman/Polando Field,4655744548864


In [50]:
g_lpa.filter('label = 1589137899521') \
    .toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,label
0,OTZ,OTZ,"Kotzebue, AK",Ralph Wien Memorial,1589137899521
1,SCC,SCC,"Deadhorse, AK",Deadhorse Airport,1589137899521
2,BUR,BUR,"Burbank, CA",Bob Hope,1589137899521
3,CDV,CDV,"Cordova, AK",Merle K Mudhole Smith,1589137899521
4,FAI,FAI,"Fairbanks, AK",Fairbanks International,1589137899521
5,SGU,SGU,"St. George, UT",St George Regional,1589137899521
6,VEL,VEL,"Vernal, UT",Vernal Regional,1589137899521
7,LMT,LMT,"Klamath Falls, OR",Crater Lake Klamath Regional,1589137899521
8,WRG,WRG,"Wrangell, AK",Wrangell Airport,1589137899521
9,DLG,DLG,"Dillingham, AK",Dillingham Airport,1589137899521


In [51]:
g_lpa.filter('label = 5583457484800') \
    .limit(10).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,label
0,ITO,ITO,"Hilo, HI",Hilo International,5583457484800
1,GUM,GUM,"Guam, TT",Guam International,5583457484800
2,KOA,KOA,"Kona, HI",Ellison Onizuka Kona International at Keahole,5583457484800
3,PPG,PPG,"Pago Pago, TT",Pago Pago International,5583457484800
4,LIH,LIH,"Lihue, HI",Lihue Airport,5583457484800
5,OGG,OGG,"Kahului, HI",Kahului Airport,5583457484800


In [52]:
g_lpa.filter('label = 5102421147648') \
    .limit(10).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,label
0,HNL,HNL,"Honolulu, HI",Daniel K Inouye International,5102421147648
1,SPN,SPN,"Saipan, TT",Francisco C. Ada Saipan International,5102421147648


#### PageRank

In [53]:
%%time
g_pr = g.pageRank(resetProbability=0.15, tol=0.01)
g_pr_v = g_pr.vertices.checkpoint()

CPU times: user 21.4 ms, sys: 4.02 ms, total: 25.4 ms
Wall time: 2min 14s


In [54]:
g_pr_v.orderBy(F.desc('pagerank')).limit(5).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,pagerank
0,ATL,ATL,"Atlanta, GA",Hartsfield-Jackson Atlanta International,19.737152
1,ORD,ORD,"Chicago, IL",Chicago O'Hare International,16.855455
2,DFW,DFW,"Dallas/Fort Worth, TX",Dallas/Fort Worth International,14.69252
3,DEN,DEN,"Denver, CO",Denver International,13.442911
4,LAX,LAX,"Los Angeles, CA",Los Angeles International,10.633261


In [55]:
%%time
g_pr_10 = g.pageRank(resetProbability=0.15, maxIter=5)
g_pr_10_v = g_pr_10.vertices.checkpoint()

CPU times: user 3.69 ms, sys: 4.34 ms, total: 8.03 ms
Wall time: 32.2 s


In [56]:
g_pr_10_v.orderBy(F.desc('pagerank')).limit(5).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,pagerank
0,ATL,ATL,"Atlanta, GA",Hartsfield-Jackson Atlanta International,21.306141
1,ORD,ORD,"Chicago, IL",Chicago O'Hare International,17.981142
2,DFW,DFW,"Dallas/Fort Worth, TX",Dallas/Fort Worth International,15.719129
3,DEN,DEN,"Denver, CO",Denver International,14.007626
4,LAX,LAX,"Los Angeles, CA",Los Angeles International,10.777033


In [57]:
%%time
g_pr_ORD = g.pageRank(resetProbability=0.15, maxIter=10, sourceId="ORD")
g_pr_ORD_v = g_pr_ORD.vertices.checkpoint()

CPU times: user 10.7 ms, sys: 2.74 ms, total: 13.4 ms
Wall time: 55.2 s


In [58]:
g_pr_ORD_v.orderBy(F.desc('pagerank')).limit(5).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,pagerank
0,ORD,ORD,"Chicago, IL",Chicago O'Hare International,0.194248
1,ATL,ATL,"Atlanta, GA",Hartsfield-Jackson Atlanta International,0.050676
2,DFW,DFW,"Dallas/Fort Worth, TX",Dallas/Fort Worth International,0.03328
3,DEN,DEN,"Denver, CO",Denver International,0.028849
4,LAX,LAX,"Los Angeles, CA",Los Angeles International,0.026747


In [59]:
%%time
g_pr_parellel = g.parallelPersonalizedPageRank(resetProbability=0.15, sourceIds=["ORD", "JNU", "PHL"], maxIter=10)
g_pr_parellel_v = g_pr_parellel.vertices.checkpoint()

CPU times: user 13 ms, sys: 337 µs, total: 13.3 ms
Wall time: 59.4 s


In [60]:
g_pr_parellel_v.orderBy(F.desc('pageranks')).drop('id').limit(5).toPandas()

Unnamed: 0,airport_code,airport_city,airport_name,pageranks
0,ORD,"Chicago, IL",Chicago O'Hare International,"[0.1942484689461799, 0.024150665305785652, 0.0..."
1,ATL,"Atlanta, GA",Hartsfield-Jackson Atlanta International,"[0.0506758879045678, 0.02461232184837368, 0.05..."
2,DFW,"Dallas/Fort Worth, TX",Dallas/Fort Worth International,"[0.03327998756177438, 0.019741051812792114, 0...."
3,DEN,"Denver, CO",Denver International,"[0.028849285153795885, 0.023137743862173148, 0..."
4,LAX,"Los Angeles, CA",Los Angeles International,"[0.026746985966971316, 0.025543865040019483, 0..."


#### Shortest Path

In [61]:
%%time
g_sp = g.shortestPaths(landmarks=["PHL"])
g_sp = g_sp.checkpoint()

CPU times: user 6.38 ms, sys: 815 µs, total: 7.19 ms
Wall time: 40.2 s


In [62]:
g_sp.limit(5).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,distances
0,MSY,MSY,"New Orleans, LA",Louis Armstrong New Orleans International,{'PHL': 1}
1,OTZ,OTZ,"Kotzebue, AK",Ralph Wien Memorial,{'PHL': 3}
2,FLG,FLG,"Flagstaff, AZ",Flagstaff Pulliam,{'PHL': 2}
3,HHH,HHH,"Hilton Head, SC",Hilton Head Airport,{'PHL': 2}
4,ABI,ABI,"Abilene, TX",Abilene Regional,{'PHL': 2}


In [63]:
g_sp.printSchema()

root
 |-- id: string (nullable = true)
 |-- airport_code: string (nullable = true)
 |-- airport_city: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- distances: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)



In [64]:
g_sp.withColumn('distance', F.map_values(F.col('distances'))[0]) \
    .orderBy(F.desc('distance')).limit(20).toPandas()

Unnamed: 0,id,airport_code,airport_city,airport_name,distances,distance
0,SPN,SPN,"Saipan, TT",Francisco C. Ada Saipan International,{'PHL': 4},4
1,PPG,PPG,"Pago Pago, TT",Pago Pago International,{'PHL': 3},3
2,OME,OME,"Nome, AK",Nome Airport,{'PHL': 3},3
3,BET,BET,"Bethel, AK",Bethel Airport,{'PHL': 3},3
4,YAK,YAK,"Yakutat, AK",Yakutat Airport,{'PHL': 3},3
5,CDV,CDV,"Cordova, AK",Merle K Mudhole Smith,{'PHL': 3},3
6,WRG,WRG,"Wrangell, AK",Wrangell Airport,{'PHL': 3},3
7,TKI,TKI,"Tokeen, AK",Tokeen Airport,{'PHL': 3},3
8,BRW,BRW,"Barrow, AK",Wiley Post/Will Rogers Memorial,{'PHL': 3},3
9,GUM,GUM,"Guam, TT",Guam International,{'PHL': 3},3


#### Triangle Count

In [65]:
%%time
g_tr = g.triangleCount()
g_tr = g_tr.checkpoint()

CPU times: user 3.77 ms, sys: 388 µs, total: 4.15 ms
Wall time: 17.5 s


In [66]:
g_tr.orderBy(F.desc('count')) \
    .drop('id') \
    .limit(5).toPandas()

Unnamed: 0,count,airport_code,airport_city,airport_name
0,2787,ATL,"Atlanta, GA",Hartsfield-Jackson Atlanta International
1,2758,DEN,"Denver, CO",Denver International
2,2712,ORD,"Chicago, IL",Chicago O'Hare International
3,2465,DFW,"Dallas/Fort Worth, TX",Dallas/Fort Worth International
4,2326,CLT,"Charlotte, NC",Charlotte Douglas International
