## The following section is for Colab Users.
### Just run the following code cells

In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://bitbucket.org/habedi/datasets/raw/b6769c4664e7ff68b001e2f43bc517888cbe3642/spark/spark-3.0.2-bin-hadoop2.7.tgz
!tar xf spark-3.0.2-bin-hadoop2.7.tgz
!rm -rf spark-3.0.2-bin-hadoop2.7.tgz*
!pip -q install findspark pyspark graphframes

[K     |████████████████████████████████| 281.3 MB 50 kB/s 
[K     |████████████████████████████████| 199 kB 66.9 MB/s 
[K     |████████████████████████████████| 154 kB 72.4 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
!wget https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.0-s_2.12/graphframes-0.8.2-spark3.0-s_2.12.jar -P /content/spark-3.0.2-bin-hadoop2.7/jars/
!cp /content/spark-3.0.2-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar /content/spark-3.0.2-bin-hadoop2.7/graphframes-0.8.2-spark3.0-s_2.12.zip

--2022-07-02 11:30:36--  https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.0-s_2.12/graphframes-0.8.2-spark3.0-s_2.12.jar
Resolving repos.spark-packages.org (repos.spark-packages.org)... 99.84.208.62, 99.84.208.69, 99.84.208.75, ...
Connecting to repos.spark-packages.org (repos.spark-packages.org)|99.84.208.62|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 247882 (242K) [binary/octet-stream]
Saving to: ‘/content/spark-3.0.2-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar’


2022-07-02 11:30:37 (1.32 MB/s) - ‘/content/spark-3.0.2-bin-hadoop2.7/jars/graphframes-0.8.2-spark3.0-s_2.12.jar’ saved [247882/247882]



In [3]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7"
os.environ["HADOOP_HOME"] = os.environ["SPARK_HOME"]

os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] pyspark-shell"

In [4]:
import findspark
findspark.init()

In [5]:
!export PYSPARK_SUBMIT_ARGS="--master local[*] pyspark-shell"
!export PYSPARK_DRIVER_PYTHON=jupyter
!export PYSPARK_DRIVER_PYTHON_OPTS=notebook

In [6]:
from pyspark.sql import SparkSession
from graphframes import *

spark = SparkSession.builder.master("local[*]").appName("GraphFrames").getOrCreate()

In [7]:
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages graphframes:graphframes:0.8.1-spark3.0-s_2.12 pyspark-shell"

**************************************************************************
**************************************************************************
**************************************************************************

In [8]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Read departuredelays.csv in Edge DataFrame
### Read airport-codes-na.txt in Vertix DataFrame (the separator is Tab i.e sep = '\t' )

#### The US flight delays data set has five columns:
- The <b>date</b> column contains an integer like 02190925 . When converted, this maps to 02-19 09:25 am.
- The <b>delay</b> column gives the delay in minutes between the scheduled and actual departure times. Early departures show negative numbers.
- The <b>distance</b> column gives the distance in miles from the origin airport to the destination airport.
- The <b>origin</b> column contains the origin IATA airport code.
- The <b>destination</b> column contains the destination IATA airport code.

#### The airport-codes data set has four columns:
- The <b>IATA</b> column contains IATA airport code.
- The <b>City, State, and Country</b> columns contains information about the airport location. 

In [10]:
Edge_DF= spark.read.option("header",True).csv("/content/drive/MyDrive/pysparkGraph/departuredelays.csv")

Edge_DF.show(5,truncate=False)

+--------+-----+--------+------+-----------+
|date    |delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|6    |602     |ABE   |ATL        |
|01020600|-8   |369     |ABE   |DTW        |
|01021245|-2   |602     |ABE   |ATL        |
|01020605|-4   |602     |ABE   |ATL        |
|01031245|-4   |602     |ABE   |ATL        |
+--------+-----+--------+------+-----------+
only showing top 5 rows



In [11]:
VertixDF=spark.read.options(header='True',inferSchema='True',delimiter='\t') \
  .csv("/content/drive/MyDrive/pysparkGraph/airport-codes-na.txt")


In [None]:
#VertixDF=VertixDF.withColumnRenamed("IATA","id")

In [12]:
VertixDF.show(5,truncate=False)

+----------+-----+-------+----+
|City      |State|Country|IATA|
+----------+-----+-------+----+
|Abbotsford|BC   |Canada |YXX |
|Aberdeen  |SD   |USA    |ABR |
|Abilene   |TX   |USA    |ABI |
|Akron     |OH   |USA    |CAK |
|Alamosa   |CO   |USA    |ALS |
+----------+-----+-------+----+
only showing top 5 rows



In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### In the vertix DataFrame, drop any duplicated rows with the same  IATA code.

In [13]:
VertixDF.count()

526

In [14]:
VertixDF=VertixDF.dropDuplicates(subset=["IATA"])


In [15]:
VertixDF.count()

524

### In the edges DataFrame:
- Rename the <b>date</b> columns to become <b>tripid</b>.
- Rename the <b>origin</b> columns to become <b>src</b>.
- Rename the <b>destination</b> columns to become <b>dst</b>.

In [16]:
Edge_DF=Edge_DF.withColumnRenamed("origin","src") \
    .withColumnRenamed("destination","dst")\
    .withColumnRenamed("date","tripid")

In [17]:
Edge_DF.show(2)

+--------+-----+--------+---+---+
|  tripid|delay|distance|src|dst|
+--------+-----+--------+---+---+
|01011245|    6|     602|ABE|ATL|
|01020600|   -8|     369|ABE|DTW|
+--------+-----+--------+---+---+
only showing top 2 rows



### In the Vertix DataFrame:
- Rename the <b>IATA</b> columns to become <b>id</b>.

In [18]:
VertixDF=VertixDF.withColumnRenamed("IATA","id")

### Create GraphFrame from Vertix and Edges DataFrames

In [19]:
GraphDF = GraphFrame(VertixDF,Edge_DF)

In [20]:
GraphDF.vertices.show()

+-------------------+-----+-------+---+
|               City|State|Country| id|
+-------------------+-----+-------+---+
|         Binghamton|   NY|    USA|BGM|
|            Lebanon|   NH|    USA|LEB|
|           Montreal|   PQ| Canada|YUL|
|         Dillingham|   AK|    USA|DLG|
|International Falls|   MN|    USA|INL|
|         Wolf Point|   MT|    USA|OLF|
|        New Orleans|   LA|    USA|MSY|
|            Toronto|   ON| Canada|YTO|
|            Spokane|   WA|    USA|GEG|
|              Havre|   MT|    USA|HVR|
|            Burbank|   CA|    USA|BUR|
|      Orange County|   CA|    USA|SNA|
|             Dryden|   ON| Canada|YHD|
|         Fort Dodge|   IA|    USA|FOD|
|          Green Bay|   WI|    USA|GRB|
|        Great Falls|   MT|    USA|GTF|
|              Homer|   AK|    USA|HOM|
|        Idaho Falls|   ID|    USA|IDA|
|      Sioux Lookout|   ON| Canada|YXL|
|       Grand Rapids|   MI|    USA|GRR|
+-------------------+-----+-------+---+
only showing top 20 rows



### Determine the number of airports

In [21]:
GraphDF.vertices.select('id').count()

524

### Determine the number of trips 

In [22]:
GraphDF.edges.count()

1391578

### What is the longest delay?

In [23]:
longest_delayPositive=float(GraphDF.edges.select("delay").rdd.max()[0])
longest_delayNegative=float(GraphDF.edges.select("delay").rdd.min()[0])
if longest_delayPositive> (-1*longest_delayNegative):
  print("the longest delay is :",longest_delayPositive)
else:
  print("the longest delay is :",(-1*longest_delayNegative))

the longest delay is : 995.0


In [24]:
from pyspark.sql.functions import *

In [25]:
GraphDF.edges.select(max("delay")).show(truncate=False)

+----------+
|max(delay)|
+----------+
|995       |
+----------+



In [26]:
GraphDF.edges.show(10)

+--------+-----+--------+---+---+
|  tripid|delay|distance|src|dst|
+--------+-----+--------+---+---+
|01011245|    6|     602|ABE|ATL|
|01020600|   -8|     369|ABE|DTW|
|01021245|   -2|     602|ABE|ATL|
|01020605|   -4|     602|ABE|ATL|
|01031245|   -4|     602|ABE|ATL|
|01030605|    0|     602|ABE|ATL|
|01041243|   10|     602|ABE|ATL|
|01040605|   28|     602|ABE|ATL|
|01051245|   88|     602|ABE|ATL|
|01050605|    9|     602|ABE|ATL|
+--------+-----+--------+---+---+
only showing top 10 rows



### Find out the number of delayed flights vs. early flights (flights that departed before actual time)

In [27]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
#GraphDF.edges.select([count(when(float(v)>0, v)).alias("delayed flights") for v in col("delay")]).show()


In [28]:
print("delayed flights : ",GraphDF.edges.withColumn("delay",col("delay").cast(DoubleType())).filter(col("delay") >0).count() )
print("early flights : ",GraphDF.edges.withColumn("delay",col("delay").cast(DoubleType())).filter(col("delay") <0).count() )

delayed flights :  591727
early flights :  668729


### What flight destinations departing SFO are most likely to have significant delays? Select the top 10
#### Hint: you should get the average delay for each destination for trips that depart from SFO only

In [None]:
#tripid|delay|distance|src|dst

In [29]:
paths = GraphDF.find("()-[e]->()")\
  .filter("e.src = 'SFO'")


In [30]:
import pyspark.sql.functions as funs

In [31]:
paths.select("e.src","e.dst","e.delay").groupBy("dst").agg(funs.avg('delay').alias("average delay for each destination")).orderBy('average delay for each destination',ascending=False).show(10,truncate=False)

+---+----------------------------------+
|dst|average delay for each destination|
+---+----------------------------------+
|JAC|30.78846153846154                 |
|OKC|24.822222222222223                |
|SUN|22.696629213483146                |
|COS|22.58888888888889                 |
|SAT|22.16                             |
|STL|20.203125                         |
|HNL|19.982608695652175                |
|ASE|19.846153846153847                |
|CEC|19.089820359281436                |
|MDW|18.771929824561404                |
+---+----------------------------------+
only showing top 10 rows



### Find the Incoming connections to the airport sorted in Desc. order.

In [32]:
#all in Incoming connections for each airport
GraphDF.inDegrees.orderBy(desc("inDegree")).show(10)

+---+--------+
| id|inDegree|
+---+--------+
|ATL|   90434|
|DFW|   66050|
|ORD|   61967|
|LAX|   53601|
|DEN|   50921|
|IAH|   42700|
|PHX|   39721|
|SFO|   38988|
|LAS|   32994|
|CLT|   28388|
+---+--------+
only showing top 10 rows



In [None]:
#tripid|delay|distance|src|dst

In [33]:
#Incoming connections to SFO
IncomingTo_SFO = GraphDF.find("()-[e]->()")\
  .filter("e.dst = 'SFO'").select("e.tripid","e.src","e.dst","e.delay","e.distance").orderBy("src",ascending=False)

In [34]:
IncomingTo_SFO.show(truncate=False)

+--------+---+---+-----+--------+
|tripid  |src|dst|delay|distance|
+--------+---+---+-----+--------+
|02010850|TUS|SFO|-3   |653     |
|02121454|TUS|SFO|-8   |653     |
|02021158|TUS|SFO|28   |653     |
|02020850|TUS|SFO|-6   |653     |
|02031449|TUS|SFO|19   |653     |
|02030850|TUS|SFO|27   |653     |
|02041454|TUS|SFO|108  |653     |
|02040830|TUS|SFO|-1   |653     |
|02051454|TUS|SFO|3    |653     |
|02050850|TUS|SFO|38   |653     |
|02061449|TUS|SFO|57   |653     |
|02060850|TUS|SFO|57   |653     |
|02071449|TUS|SFO|190  |653     |
|02070850|TUS|SFO|-5   |653     |
|02080850|TUS|SFO|29   |653     |
|02091158|TUS|SFO|209  |653     |
|02090850|TUS|SFO|0    |653     |
|02101449|TUS|SFO|61   |653     |
|02100850|TUS|SFO|10   |653     |
|02111454|TUS|SFO|-1   |653     |
+--------+---+---+-----+--------+
only showing top 20 rows



### Find the Outgoing connections from the airport sorted in Desc. order.

In [35]:
#all in Outgoing connections for each airport
GraphDF.outDegrees.orderBy(desc("outDegree")).show(10)

+---+---------+
| id|outDegree|
+---+---------+
|ATL|    91484|
|DFW|    68482|
|ORD|    64228|
|LAX|    54086|
|DEN|    53148|
|IAH|    43361|
|PHX|    40155|
|SFO|    39483|
|LAS|    33107|
|CLT|    28402|
+---+---------+
only showing top 10 rows



In [36]:
OutgoingFrom_SFO = GraphDF.find("()-[e]->()")\
  .filter("e.src = 'SFO'").select("e.tripid","e.src","e.dst","e.delay","e.distance").orderBy("dst",ascending=False)

In [37]:
OutgoingFrom_SFO.show(500,truncate=False)

+--------+---+---+-----+--------+
|tripid  |src|dst|delay|distance|
+--------+---+---+-----+--------+
|01081055|SFO|TUS|1    |653     |
|02020825|SFO|TUS|37   |653     |
|01311939|SFO|TUS|46   |653     |
|01010753|SFO|TUS|-4   |653     |
|01141055|SFO|TUS|1    |653     |
|01051934|SFO|TUS|-4   |653     |
|01161055|SFO|TUS|-7   |653     |
|01060753|SFO|TUS|4    |653     |
|01171055|SFO|TUS|6    |653     |
|01171939|SFO|TUS|-3   |653     |
|01221055|SFO|TUS|-4   |653     |
|01191939|SFO|TUS|1    |653     |
|01201055|SFO|TUS|-3   |653     |
|01121939|SFO|TUS|-7   |653     |
|01091055|SFO|TUS|21   |653     |
|01040753|SFO|TUS|-5   |653     |
|01101055|SFO|TUS|-1   |653     |
|01101939|SFO|TUS|-9   |653     |
|01111730|SFO|TUS|87   |653     |
|01281939|SFO|TUS|-8   |653     |
|01050750|SFO|TUS|-4   |653     |
|03051130|SFO|TUS|-8   |653     |
|03051813|SFO|TUS|9    |653     |
|02091939|SFO|TUS|97   |653     |
|02181130|SFO|TUS|-6   |653     |
|01061155|SFO|TUS|28   |653     |
|03061915|SFO|

### Use motif finding to answer this question: which delays could we blame on SFO?
#### Hint: this practically means that SFO is a transit station

In [38]:
GraphDF.find("()-[e]->(v2);(v2)-[e2]->()")\
  .filter("e.dst = 'SFO'").filter("e2.delay > 0").select("e.src","e.dst","e2.dst","e2.delay").orderBy("delay",ascending=False).show(truncate=False)

+---+---+---+-----+
|src|dst|dst|delay|
+---+---+---+-----+
|ABQ|SFO|IAH|99   |
|ABQ|SFO|JFK|99   |
|ABQ|SFO|BOS|99   |
|ABQ|SFO|MIA|99   |
|ABQ|SFO|DEN|99   |
|ABQ|SFO|SEA|99   |
|ABQ|SFO|LAX|99   |
|ABQ|SFO|LAX|99   |
|ABQ|SFO|PDX|99   |
|ABQ|SFO|AUS|99   |
|ABQ|SFO|RDD|99   |
|ABQ|SFO|DEN|99   |
|ABQ|SFO|SBP|99   |
|ABQ|SFO|SMF|99   |
|ABQ|SFO|BOI|99   |
|ABQ|SFO|JAC|99   |
|ABQ|SFO|JFK|99   |
|ABQ|SFO|LAX|99   |
|ABQ|SFO|AUS|99   |
|ABQ|SFO|MIA|99   |
+---+---+---+-----+
only showing top 20 rows



### Determine Airport Ranking in Desc. order using PageRank algorithm

In [39]:
results = GraphDF.pageRank(resetProbability=0.15, maxIter=5)

In [40]:
results.vertices.orderBy('pagerank',ascending=False).show()
results.edges.orderBy('weight',ascending=False).show()

+--------------+-----+-------+---+------------------+
|          City|State|Country| id|          pagerank|
+--------------+-----+-------+---+------------------+
|       Atlanta|   GA|    USA|ATL|30.852689637281415|
|        Dallas|   TX|    USA|DFW| 22.35090825185797|
|       Chicago|   IL|    USA|ORD|21.476110490648225|
|        Denver|   CO|    USA|DEN|15.864147080276451|
|   Los Angeles|   CA|    USA|LAX|14.200409160093676|
|       Houston|   TX|    USA|IAH|13.038206739080561|
| San Francisco|   CA|    USA|SFO|11.262952692947371|
|       Phoenix|   AZ|    USA|PHX| 10.61464841238288|
|Salt Lake City|   UT|    USA|SLC|  9.46228920900108|
|     Las Vegas|   NV|    USA|LAS| 8.571473320234631|
|       Seattle|   WA|    USA|SEA| 7.548959663023094|
|        Newark|   NJ|    USA|EWR| 7.180791090543533|
|       Orlando|   FL|    USA|MCO| 7.160926547658392|
|     Charlotte|   NC|    USA|CLT| 7.143654438927901|
|       Detroit|   MI|    USA|DTW| 6.890631890038128|
|   Minneapolis|   MN|    US

## Determine the most popular flights (single city hops)

In [41]:
GraphDF.find("()-[e]->(v2)")\
   .groupBy("v2.City").agg(funs.count('e.tripid').alias("flights numbers")).orderBy('flights numbers',ascending=False).take(1)

[Row(City='Atlanta', flights numbers=90434)]

In [42]:
#if you mean most popular flights (from same source to same destination)
GraphDF.edges.groupBy("src", "dst").agg(count("tripid")).orderBy("count(tripid)",ascending=False).show(10)

+---+---+-------------+
|src|dst|count(tripid)|
+---+---+-------------+
|SFO|LAX|         3232|
|LAX|SFO|         3198|
|LAS|LAX|         3016|
|LAX|LAS|         2964|
|JFK|LAX|         2720|
|LAX|JFK|         2719|
|ATL|LGA|         2501|
|LGA|ATL|         2500|
|LAX|PHX|         2394|
|PHX|LAX|         2387|
+---+---+-------------+
only showing top 10 rows



### Find and Save a Subragph that obtained from the following pattern:
#### The flight starts from an airport and return back to the same airport through 2 other airports.

In [43]:
#Create a new small graph with 30%
sample_edgesDF=GraphDF.edges.sample(withReplacement=False, fraction=0.3)
sample_verticesDF= GraphDF.vertices.sample(withReplacement=False, fraction=0.3)
subGraph=GraphFrame(sample_verticesDF, sample_edgesDF)


In [44]:
subGraphWithPattern=subGraph.find("(v1)-[e1]->(v2); (v2)-[e2]->(v3); (v3)-[e3]->(v1)")

In [45]:
subGraphWithPattern.show(truncate=False)

+---------------------------+-----------------------------+---------------------------+------------------------------+-----------------------+-----------------------------+
|v1                         |e1                           |v2                         |e2                            |v3                     |e3                           |
+---------------------------+-----------------------------+---------------------------+------------------------------+-----------------------+-----------------------------+
|[Albuquerque, NM, USA, ABQ]|[01021440, -4, 852, ABQ, MSP]|[Minneapolis, MN, USA, MSP]|[03311005, -4, 1109, MSP, PHX]|[Phoenix, AZ, USA, PHX]|[03312130, 20, 285, PHX, ABQ]|
|[Albuquerque, NM, USA, ABQ]|[01021440, -4, 852, ABQ, MSP]|[Minneapolis, MN, USA, MSP]|[03311005, -4, 1109, MSP, PHX]|[Phoenix, AZ, USA, PHX]|[03311740, 74, 285, PHX, ABQ]|
|[Albuquerque, NM, USA, ABQ]|[01021440, -4, 852, ABQ, MSP]|[Minneapolis, MN, USA, MSP]|[03311005, -4, 1109, MSP, PHX]|[Phoenix, AZ, USA

In [46]:
#City|State|Country| id
vertice_PatternDF=subGraphWithPattern.select("v1.City","v1.State","v1.Country","v1.id")

In [47]:
vertice_PatternDf=vertice_PatternDF.dropDuplicates(subset=["id"])

In [48]:
vertice_PatternDF.show(truncate=False)

+-----------+-----+-------+---+
|City       |State|Country|id |
+-----------+-----+-------+---+
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
|Albuquerque|NM   |USA    |ABQ|
+-----------+-----+-------+---+
only showing top 20 rows



In [49]:
vertice_PatternDF.count()

11881250196

In [68]:
vertice_PatternDF=vertice_PatternDF.union(subGraphWithPattern.select("v2.City","v2.State","v2.Country","v2.id"))

In [51]:
vertice_PatternDF=vertice_PatternDF.union(subGraphWithPattern.select("v3.City","v3.State","v3.Country","v3.id"))

In [71]:
vertice_PatternDF=vertice_PatternDF.dropDuplicates(subset=["id"])

In [56]:
vert=vertice_PatternDF.dropDuplicates(subset=["id"])

In [59]:
vertice_PatternDf.write.csv("/content/drive/MyDrive/pysparkGraph/out/Lab4Graph_verticesForSubGragh")

Py4JJavaError: ignored

In [74]:
# tripid|delay|distance|src|dst
edges_PatternDF=subGraphWithPattern.select("e1.tripid","e1.delay","e1.distance","e1.src","e1.dst")

In [76]:
e=edges_PatternDF.drop_duplicates(subset=["tripid"])

In [61]:
edges_PatternDF=edges_PatternDF.dropDuplicates(subset=["tripid"])

In [78]:
edges_PatternDF=edges_PatternDF.union(subGraphWithPattern.select("e2.tripid","e2.delay","e2.distance","e2.src","e2.dst"))

In [79]:
edges_PatternDF=edges_PatternDF.union(subGraphWithPattern.select("e3.tripid","e3.delay","e3.distance","e3.src","e3.dst"))

In [80]:
edges_PatternDF=edges_PatternDF.dropDuplicates(subset=["tripid"])

In [81]:
edges_PatternDF.write.csv("/content/drive/MyDrive/pysparkGraph/out/Lab4Graph_edgesForSubGragh")

Py4JJavaError: ignored