## Qu. 3: Data Analysis and PageRank in Spark [40pt]

In this problem, you will learn how to implement the PageRank algorithm in Spark. <br>
The general computation should be done in Spark, and you may also include numpy operations whenever needed. <br>
Make sure the Page-Rank computation follows distributive map-reduce.

**Mathematical notations:** <br>
Assume that the **directed** graph  $G=(V,E)$  has  $n$  nodes (numbered $1, 2,...,n$)  $m$  edges. <br>
Let $E =[e_{ij}]_{n×n}$ is the adjencency matrix with $e_{ij}=1$ if there is a directed edge $i \to j$ and $e_{ij}=0$ otherwise. Denote by $d_i$ the out-degree of node $i$, and assume for simplicity that all nodes have positive out-degree (i.e. $d_i > 0 \: \forall i$, we have no dangling pages). <br>
Let $M =[m_{ij}]_{n×n}$  is an $n \times n$  stochastic matrix as defined in class, such that $m_{ij} = \frac{e_{ij}}{d_i}$ for any  $i,j∈\{1,..,n\}$. <br>
Let $\beta \in (0,1)$ be a probability of random surfer to go from a current web-page to a random neighbour via a random hyperlink, and let $1-\beta$ be the teleport probability, i.e the probability of going to a random page on the web.
The goal of the PageRank algorithm it to find the stationary distribution of the Markov Chain defined by the random surfer, or equivalently, the leading right eigenvector of the matrix $M' \equiv \beta M + \frac{1}{n}(1-\beta) {\bf 1}_n {\bf 1}_n^t$, where ${\bf 1}_n \equiv (1,1,..,1)^t \in \mathbb{R}^n$ is the vector of all ones of length $n$. <br>
When there are dangling pages, we set the corresponding row of $M$ to $\frac{1}{n}$ everywhere and the above formula remains valid.


**Implementation instructions:** <br>
You may choose to store the PageRank vector $r$ either in memory or as an RDD. <br>
But, the matrix $M$ of links is too large to store in memory, and you are allowed to store matrix $M$ only in an RDD. e.g.: <br>
`network_links = sc.textFile (”my-network.txt”)` if your network data is contained in the text file `"my-network.txt"`. <br>
On an actual cluster, an RDD is partitioned across the nodes of the cluster. However, you cannot then use the `M = data.collect()` command which fetches the entire RDD to a single machine at the driver node stores it as an array locally.


**Datasets:** <br>
We will compute PageRank for two network datasets, a *flight delays* network obtained from databricks, and the wikipedia network you extracted in question 1. We describe next the *flight delays* <br>

**Airline On-Time Performance and Causes of Flight Delays Database**. 
This database contains scheduled and actual departure and arrival times, and reason of delay.  
Reported by certified U.S. air carriers that account for at least one percent of domestic scheduled passenger revenues.   
The data is collected by the Office of Airline Information, Bureau of Transportation Statistics (BTS).  
Source:  
https://catalog.data.gov/dataset/airline-on-time-performance-and-causes-of-flight-delays

Several datasets related to flights are available in databricks at `"databricks-datasets/flights/"`. <br>
You can read about them in the Readme file `"/databricks-datasets/flights/README.md"` and the available links in it. <br>
We will use two flight-related datasets to build our network:
1. A `flightDelays` dataset, available at `"/databricks-datasets/flights/departuredelays.csv"`
2. An `airports` dataset, available at `"/databricks-datasets/flights/airport-codes-na.txt"`

The `airports` dataset will be used to define the nodes of a **directed** network, where each node corresponds to an airport. <br>
The `flightDelays` dataset will be used to define the edges of the network, where each record corresponds to a (delayed) flight between two airports. 

Below we supply code for loading the datasets into two Spark data-frames: `airports` and `delays`

In [0]:
# Required modules
import re
import sys
from operator import add
from pyspark.sql import functions as f

# Set File Paths
tripdelaysFilePath = "/databricks-datasets/flights/departuredelays.csv"
airportsnaFilePath = "/databricks-datasets/flights/airport-codes-na.txt"

# Obtain airports dataset
airports  = sqlContext.read.format("com.databricks.spark.csv").options(header='true', inferschema='true', delimiter='\t').load(airportsnaFilePath)
airports.registerTempTable("airports")

# Obtain departure Delays data
delays = sqlContext.read.format("com.databricks.spark.csv").options(header='true').load(tripdelaysFilePath)
delays.registerTempTable("delays")
delays.cache()

**3.a. [3pt]** Show the top 10 airport and top 10 delays from both dataframes in a nice table format

### a.

In [0]:
delays.limit(10).toPandas()

Unnamed: 0,date,delay,distance,origin,destination
0,2151800,108,290,ORD,MSP
1,2151800,142,772,ORD,DEN
2,2151303,16,1516,ORD,LAX
3,2151157,7,1316,ORD,LAS
4,2151818,55,1511,ORD,PDX
5,2151033,12,873,ORD,MCO
6,2150941,0,1499,ORD,SNA
7,2151320,17,1604,ORD,SFO
8,2151804,2,1497,ORD,SAN
9,2152000,17,119,ORD,GRR


In [0]:
airports.limit(10).toPandas()

Unnamed: 0,City,State,Country,IATA
0,Abbotsford,BC,Canada,YXX
1,Aberdeen,SD,USA,ABR
2,Abilene,TX,USA,ABI
3,Akron,OH,USA,CAK
4,Alamosa,CO,USA,ALS
5,Albany,GA,USA,ABY
6,Albany,NY,USA,ALB
7,Albuquerque,NM,USA,ABQ
8,Alexandria,LA,USA,AEX
9,Allentown,PA,USA,ABE


### b.
**3.b. [9pt]** Run sql commands to answer the following questions: <br>
**(i)** What US city incurs the most delays as an `origin` airport? Run an sql query to find out. <br>
**(ii)** We would like to know, for each origin airport and state combination, <br>
the average distance and delay of all outgoing flights with positive delays. <br>
In addition, in the same query, we would like to see the average state delay <br>
(regardless of origin airport) for each airport-state combination with positive delay.

Write an sql query that returns  `origin`, `state`, and also: <br>
`average distance` (mean distance between origin and destinations over all outgoing flights) <br>
`average delay` (average delay of all outgoing flights from an origin, with a positive delay) <br>
`average state delay` (average dealy over all outgloing flights from all airports in the same state, with a positive delay). 

The query should return the results sorted by decreasing `average state delay`, with only the first 10 rows shown.   <br>

`Hint:` use a `window function` among other sql commands. <br>
**(iii)** We would like to compute the PageRank vector only for `origin` nodes that have outgoing edges. <br>
Remove from the `delays` dataframe all the records of flights connecting to destination aiports that <br>
appear only as `destination` (also called dead-end nodes, or dangling page). <br>
That is, if a record contains as `origin` an airport called `ABC`, and as `destination` an airport called `XYZ`, <br>
you should keep it only if `XYZ` is an `origin` airport for another record. <br>
Use an sql command **inside python using Spark.sql**, and update the `delays` variable to contain the output.

In [0]:
# making a a lazily evaluated view:
delays.createOrReplaceTempView('delays')
airports.createOrReplaceTempView('airports')

#### i.

In [0]:
%sql
WITH delays_counts AS
(
         SELECT   Count(delay) AS delays_num,
                  origin
         FROM     delays
         WHERE    delay > 0
         GROUP BY origin ), city_with_origin AS
(
           SELECT     a.city,
                      d.delays_num,
                      d.origin
           FROM       airports      AS a
           INNER JOIN delays_counts AS d
           ON         a.iata = d.origin ), city_delays
(
         select   city,
                  sum(delays_num) AS delays_num
         FROM     city_with_origin
         GROUP BY city )
SELECT   *
FROM     city_delays
ORDER BY delays_num DESC limit 10;

city,delays_num
Chicago,46483
Atlanta,41828
Dallas,35859
Denver,30760
Houston,29648
Los Angeles,22684
New York,18348
Phoenix,17555
Las Vegas,16938
San Francisco,16552


#### ii.

In [0]:
%sql

WITH all_data AS
(
           SELECT     a.state,
                      d.*
           FROM       airports a
           INNER JOIN delays d
           ON         a.iata = d.origin ), average_dist AS
(
         SELECT   origin,
                  state,
                  Round(Avg(distance),3) AS average_origin_dist
         FROM     all_data
         GROUP BY state,
                  origin ), average_delay AS
(
         SELECT   origin,
                  state,
                  Round(Avg(delay), 3) AS average_origin_delay
         FROM     all_data
         WHERE    delay > 0
         GROUP BY state,
                  origin ), average_delay_per_state_table AS
(
         SELECT   state,
                  Round(Avg(delay), 3) AS average_delay_per_state
         FROM     all_data
         WHERE    delay > 0
         GROUP BY state ), final_table AS
(
           SELECT     a.state,
                      b.origin,
                      a.average_origin_dist,
                      b.average_origin_delay,
                      c.average_delay_per_state
           FROM       average_dist a
           INNER JOIN average_delay b
           ON         a.state = b.state
           AND        a.origin = b.origin
           INNER JOIN average_delay_per_state_table c
           ON         a.state = c.state )
SELECT   *
FROM     final_table
ORDER BY average_delay_per_state DESC limit 10

state,origin,average_origin_dist,average_origin_delay,average_delay_per_state
VT,BTV,373.959,67.782,67.782
SD,RAP,459.791,43.476,49.857
SD,FSD,417.799,52.219,49.857
ME,BGR,472.578,70.94,49.745
ME,PWM,400.497,46.609,49.745
VA,CHO,390.477,55.197,48.527
VA,ROA,317.438,68.038,48.527
VA,RIC,460.668,48.736,48.527
VA,PHF,489.303,46.555,48.527
VA,ORF,461.022,45.04,48.527


#### iii.

In [0]:
none_dangling_airports_query = """
WITH distinct_dest AS
(
         SELECT   destination
         FROM     delays
         GROUP BY destination ), distinct_origin AS
(
         SELECT   origin
         FROM     delays
         GROUP BY origin ), no_origin_arprts AS
(
           SELECT     destination
           FROM       distinct_origin o
           RIGHT JOIN distinct_dest d
           ON         o.origin = d.destination
           WHERE      origin IS NULL )
SELECT d.*
FROM   delays d LEFT anti
JOIN   no_origin_arprts a
ON     d.destination = a.destination
"""

delays_none_dangling_filtered= sql(none_dangling_airports_query)
delays_none_dangling_filtered.limit(10).display()

date,delay,distance,origin,destination
2011110,-3,311,SLC,BTM
2011530,-8,311,SLC,BTM
2021110,-3,311,SLC,BTM
2021955,55,311,SLC,BTM
2031110,48,311,SLC,BTM
2032151,3,311,SLC,BTM
2041110,-3,311,SLC,BTM
2042103,-3,311,SLC,BTM
2051110,-3,311,SLC,BTM
2052103,3,311,SLC,BTM


### c.

**3.c[6 pt]**
In this question we build an object representing the network of delayed flights connecting between airports, as a preparation for the PageRank algorithm. <br>
For this, we will only look at the `origin` and `destination` columns, not giving any weight to the `delay` time.  <br>
Each (`origin`, `destination`) pair should have at most one link in the the network, even if there are multipled delayed flights connecting them. <br>

Create a new `RDD` data structure of tuples called `ranks`, storing the initial PageRank value for each `origin` node. 
Set the initialization value as `1/n` for all nodes, where `n` is the number of `origin` node. <br>
We will ignore nodes that are only `destination` and do no not appear as `origin`, in order to avoid dangling pages and to simplify the calculations. <br>

Next, create another `RDD` data structure of tuples, named `links`, where each tuple is composed of the `origin` and an iterator (a `GroupByKey`) of the `destination`. <br>
Show the first $10$ rows of the resulting `links` and `ranks` `RDD` data structures.

In [0]:
network = delays_none_dangling_filtered.select('origin', 'destination').drop_duplicates()
N = network.select('origin').count()
ranks = network.rdd.map(lambda x: (x.origin, 1/N))
links = network.rdd.map(lambda x: (x.origin, x.destination)).groupByKey()

print('First 10 Ranks')
display(ranks.take(10))


_1,_2
ATL,0.0002475247524752476
DTW,0.0002475247524752476
ORD,0.0002475247524752476
PHL,0.0002475247524752476
DFW,0.0002475247524752476
ATL,0.0002475247524752476
BWI,0.0002475247524752476
DAL,0.0002475247524752476
DEN,0.0002475247524752476
DFW,0.0002475247524752476


In [0]:
print('First 10 Links')
display(links.mapValues(list).take(10))

_1,_2
ATL,"List(ABE, ABQ, ABY, AEX, AGS, ALB, ATW, AUS, AVL, AVP, BDL, BHM, BMI, BNA, BOS, BQK, BTR, BTV, BUF, BWI, BZN, CAE, CAK, CHA, CHO, CHS, CID, CLE, CLT, CMH, COS, CRW, CSG, CVG, DAB, DAL, DAY, DCA, DEN, DFW, DHN, DSM, DTW, EGE, ELP, EVV, EWR, EYW, FAR, FAY, FLL, FNT, FSD, FSM, FWA, GNV, GPT, GRB, GRR, GSO, GSP, GTR, HDN, HNL, HOU, HPN, HSV, IAD, IAH, ICT, ILM, IND, JAC, JAN, JAX, JFK, LAS, LAX, LEX, LFT, LGA, LIT, MCI, MCO, MDT, MDW, MEM, MGM, MHT, MIA, MKE, MLB, MLI, MLU, MOB, MSN, MSP, MSY, MTJ, MYR, OAJ, OKC, OMA, ONT, ORD, ORF, PBI, PDX, PHF, PHL, PHX, PIA, PIT, PNS, PSP, PVD, PWM, RDU, RIC, ROA, ROC, RSW, SAN, SAT, SAV, SBN, SDF, SEA, SFO, SGF, SHV, SJC, SJU, SLC, SMF, SNA, SRQ, STL, STT, SYR, TLH, TPA, TRI, TUL, TUS, TYS, VLD, VPS, XNA)"
ORD,"List(ABE, ABQ, ALB, ALO, ANC, ATL, ATW, AUS, AVL, AVP, AZO, BDL, BHM, BMI, BNA, BOI, BOS, BTV, BUF, BWI, BZN, CAE, CAK, CHA, CHO, CHS, CID, CLE, CLT, CMH, CMI, COS, CRW, CVG, DAY, DBQ, DCA, DEN, DFW, DLH, DSM, DTW, EGE, ELM, ELP, EVV, EWR, FAR, FLL, FNT, FSD, FWA, GRB, GRR, GSO, GSP, GUC, HDN, HNL, HPN, HSV, IAD, IAH, ICT, IND, JAC, JAN, JAX, JFK, LAN, LAS, LAX, LEX, LGA, LIT, LNK, LSE, MBS, MCI, MCO, MDT, MEM, MHT, MIA, MKE, MLI, MOB, MQT, MSN, MSP, MSY, MTJ, OKC, OMA, ORF, PBI, PDX, PHL, PHX, PIA, PIT, PNS, PSP, PVD, PWM, RAP, RDU, RIC, RNO, ROA, ROC, RST, RSW, SAN, SAT, SAV, SBN, SCE, SDF, SEA, SFO, SGF, SJC, SJU, SLC, SMF, SNA, SRQ, STL, STT, SUX, SYR, TOL, TPA, TUL, TUS, TVC, TYS, XNA)"
DTW,"List(ABE, ALB, ATL, ATW, AUS, AVP, BDL, BGR, BHM, BNA, BOS, BUF, BWI, CAE, CAK, CHA, CID, CLE, CLT, CMH, CRW, CVG, DCA, DEN, DFW, ELM, EVV, EWR, FLL, FWA, GRB, GRR, GSO, HSV, IAD, IAH, IND, JAX, JFK, LAN, LAS, LAX, LEX, LGA, LIT, MBS, MCI, MCO, MDT, MDW, MEM, MHT, MIA, MKE, MSN, MSP, MSY, MYR, OKC, OMA, ORD, ORF, PBI, PDX, PHL, PHX, PIA, PIT, PVD, RDU, RIC, ROA, ROC, RSW, SAN, SAT, SAV, SBN, SDF, SEA, SFO, SJU, SLC, SRQ, STL, TPA, TUL, TVC, TYS, XNA)"
PHL,"List(ABE, ATL, AUS, BDL, BHM, BNA, BOS, BTV, BWI, CLE, CLT, DCA, DEN, DFW, DTW, EWR, FLL, IAH, IND, JAX, LAS, LAX, LGA, MCI, MCO, MDW, MHT, MIA, MSP, MSY, ORD, PBI, PHX, PIT, PVD, RDU, RSW, SAN, SAT, SEA, SFO, SJU, SLC, STL, STT, SYR, TPA)"
DFW,"List(ABI, ABQ, ACT, AEX, AMA, ATL, AUS, BDL, BHM, BMI, BNA, BOS, BPT, BRO, BTR, BWI, CAE, CHA, CHS, CID, CLE, CLL, CLT, CMH, CMI, COS, CRP, CRW, CVG, DAY, DCA, DEN, DRO, DSM, DTW, EGE, ELP, EVV, EWR, FAR, FAT, FLL, FSD, FSM, FWA, GGG, GJT, GPT, GRR, GSO, GSP, GUC, HDN, HNL, HOU, HSV, IAD, IAH, ICT, IND, JAC, JAN, JAX, JFK, LAS, LAW, LAX, LBB, LCH, LEX, LFT, LGA, LIT, LRD, MAF, MCI, MCO, MEM, MFE, MGM, MIA, MKE, MLI, MLU, MOB, MSN, MSP, MSY, MTJ, OGG, OKC, OMA, ONT, ORD, ORF, PBI, PDX, PHL, PHX, PIA, PIT, PNS, PSP, RAP, RDU, RIC, RNO, RSW, SAN, SAT, SAV, SDF, SEA, SFO, SGF, SHV, SJC, SJT, SJU, SLC, SMF, SNA, SPS, STL, TLH, TPA, TUL, TUS, TXK, TYR, TYS, VPS, XNA)"
PHX,"List(ABQ, ANC, ATL, AUS, BFL, BNA, BOI, BOS, BUF, BUR, BWI, CLE, CLT, CMH, CVG, DCA, DEN, DFW, DRO, DSM, DTW, ELP, EWR, FAT, FLL, GEG, GJT, HNL, HOU, IAD, IAH, IND, JFK, KOA, LAS, LAX, LGA, LGB, LIH, LIT, MCI, MCO, MDW, MIA, MKE, MRY, MSP, MSY, OAK, OGG, OKC, OMA, ONT, ORD, PDX, PHL, PIT, PSP, RDU, RNO, SAN, SAT, SBA, SBP, SDF, SEA, SFO, SJC, SLC, SMF, SNA, STL, TPA, TUL, TUS, YUM)"
SAN,"List(ABQ, ATL, AUS, BNA, BOI, BOS, BWI, CLE, CLT, DCA, DEN, DFW, DTW, EWR, HNL, HOU, IAD, IAH, JFK, LAS, LAX, LIH, MCI, MCO, MDW, MIA, MSP, OAK, OGG, ORD, PDX, PHL, PHX, RNO, SAT, SEA, SFO, SJC, SLC, SMF, STL, TUS)"
SEA,"List(ABQ, ANC, ATL, AUS, BOS, BUR, CLE, CLT, COS, CVG, DCA, DEN, DFW, DTW, EWR, FAI, FAT, FLL, GEG, HDN, HNL, IAD, IAH, JAC, JFK, JNU, KOA, KTN, LAS, LAX, LGB, LIH, MCI, MCO, MDW, MIA, MKE, MSP, OAK, OGG, OMA, ONT, ORD, PDX, PHL, PHX, PSP, RNO, SAN, SAT, SBA, SFO, SJC, SLC, SMF, SNA, STL, TUS)"
SFO,"List(ABQ, ACV, ANC, ATL, AUS, BFL, BOI, BOS, BUR, BWI, BZN, CEC, CIC, CLE, CLT, COS, CVG, DCA, DEN, DFW, DTW, EUG, EWR, FAT, FLL, HNL, IAD, IAH, IND, JAC, JFK, KOA, LAS, LAX, LGB, LIH, MCI, MCO, MDW, MFR, MIA, MKE, MOD, MRY, MSP, MSY, OGG, OKC, ONT, ORD, PDX, PHL, PHX, PIT, PSC, PSP, RDD, RDM, RDU, RNO, SAN, SAT, SBA, SBP, SEA, SLC, SMF, SNA, STL, SUN, TUS)"
SLC,"List(ABQ, ANC, ATL, AUS, BIL, BNA, BOI, BOS, BTM, BUR, BWI, BZN, CDC, CLT, COD, COS, CPR, CVG, DCA, DEN, DFW, DTW, EKO, EUG, EWR, FAR, FAT, FCA, GEG, GJT, GTF, HLN, HNL, IAH, IDA, IND, JAC, JFK, LAS, LAX, LGB, MCI, MCO, MDW, MEM, MFR, MSN, MSO, MSP, MSY, OAK, OKC, OMA, ONT, ORD, PDX, PHL, PHX, PIH, PSC, PSP, RAP, RDM, RDU, RNO, SAN, SAT, SEA, SFO, SGU, SJC, SMF, SNA, STL, SUN, TUL, TUS, TWF)"


### d.

**3.d. [6pt]** 
Recall the PageRank algorithm: 
1. Set $r = \frac{1}{n} {\bf 1}_n$
2. For $i=1$ to $I$: <br>
&nbsp;&nbsp;     Set $r \leftarrow \frac{1-\beta}{n} {\bf 1}_n + \beta M^t r$

The conts function below, is used to create an iterator that transfers from the ranks vector $r$ to $M r$ , that is used in each iteration of the algorithm
(in part 2 above). <br>
Using the conts function, join the ranks to the links data, and create an `RDD` object called `contrib`, which stores for each node $j$ the 
sum $\sum_{i=1}^n m_{ij} r_i$, i.e. the contributions of the PageRank scores over all of the nodes that link to it. <br>
Display the top $10$ values of the resulting `RDD`

**Hint:** First, use `flatmap` to obtain the contribution $m_{ij} r_i$ for each link $i \to j$. Then, reduce to sum the contributions from all links going into the same `destination` node $j$.

Next, update the `ranks` vector using the resulting `contrib` according to the Page Rank algorithm, with $\beta=0.85$. This completes one iteration of the algorithm. <br>
Display the first $10$ values of the resulting `ranks` `RDD`

In [0]:
# Converts nodes iterator and ranks vec to ranks normalized by out-degree  
def conts(nodes, rank): 
    """For each node in the graph calculate the number of connected  nodes, 
    and for each provide an node, adjusted rank by size of connected nodes:
    """
    num_nodes = len(nodes)
    for node in nodes:
        yield (node, rank / num_nodes)


In [0]:
BETA = 0.85
contrib = links.join(ranks).flatMap(lambda x: conts(x[1][0], x[1][1]))
ranks = contrib.reduceByKey(add).mapValues(lambda value: (value*BETA) + ((1-BETA)/N))
display(ranks.take(10))

_1,_2
ABE,0.0008787128712871311
ATL,0.0313861386138613
AUS,0.0078217821782178
BHM,0.0040346534653465
BOS,0.0109777227722771
BTV,0.0019306930693069
CLT,0.0141336633663365
DCA,0.0090841584158415
DEN,0.0259158415841584
DTW,0.0187623762376237


### e.
**3.e. [6pt]:** The above code implemented one iteration of the PageRank algorithm. <br>
Use a loop to apply $50$ iterations starting from the uniform initialization, and with a `beta` of $0.85$. <br>
Show the $10$ airports with the highest PageRank score, along with their PageRank values, and City name.

In [0]:
## PageRank algorithm from scratch
BETA = 0.85
network = delays_none_dangling_filtered.select('origin', 'destination').drop_duplicates()
N = network.select('origin').count()
ranks = network.rdd.map(lambda x: (x.origin, 1/N))
links = network.rdd.map(lambda x: (x.origin, x.destination)).groupByKey()
for i in range(50):
  contrib = links.join(ranks).flatMap(lambda x: conts(x[1][0], x[1][1]))
  ranks = contrib.reduceByKey(add).mapValues(lambda value: (value*BETA) + ((1-BETA)/N))

## Display Results
results_ranks = ranks.toDF(["Airport", "Rank"])

results_ranks.alias('r')\
.join(airports.alias('a'), f.col('r.Airport') == f.col('a.IATA'))\
.select('a.City', 'r.Airport', f.round('r.Rank',5).alias("Rank"))\
.orderBy(f.col('Rank').desc())\
.limit(10)\
.display()

City,Airport,Rank
Atlanta,ATL,0.00255
Chicago,ORD,0.00228
Dallas,DFW,0.00227
Denver,DEN,0.00185
Houston,IAH,0.00154
Minneapolis,MSP,0.00135
Salt Lake City,SLC,0.00133
Detroit,DTW,0.00127
Los Angeles,LAX,0.00114
San Francisco,SFO,0.00111


### f.

**3.e [4pt]** In this sub-question we run the PageRank algoithm on the much larger, `wikiepdia` dataset. <br>
Load the `wikipedia` network dataset file created in Quesion 1. <br> 
Your uploaded datasets can be accessed via:  `dbfs:/FileStore/shared_uploads/your.account.email@whatever.ending.you.have.com/` <br>
You should upload into two `RDD` objects using the `sc.textFile` command: <br>
One, called `keyvalue` containing the nodes, uploaded from the `keyvalue` file. <br> 
Another, called `transition` containing the edges, uploaded from the file created in Question 1.b., in the format of one edge (two IDs) per line. <br>
Display the top $10$ (nodes or edges) for each `RDD`

In [0]:
path = "dbfs:/FileStore/shared_uploads/yonatan.lourie@forter.com/"  # change to your name 

In [0]:
display(spark.read.option("delimiter","\t").csv(f'{path}keyvalue').withColumnRenamed("_c0", "node").withColumnRenamed("_c1", "url"))

node,url
0,https://en.wikipedia.org/wiki/Statistics
1,https://en.wikipedia.org/wiki/Category:Statistics
2,https://en.wikipedia.org/wiki/Portal:Mathematics
3,https://en.wikipedia.org/wiki/Normal_distribution
4,https://en.wikipedia.org/wiki/Scatter_plot
5,https://en.wikipedia.org/wiki/Iris_flower_data_set
6,https://en.wikipedia.org/wiki/Data
7,https://en.wikipedia.org/wiki/Statistical_model
8,https://en.wikipedia.org/wiki/Statistical_survey
9,https://en.wikipedia.org/wiki/Experimental_design


In [0]:
keyvalue_df = spark.read.option("delimiter","\t").csv(f'{path}keyvalue').withColumnRenamed("_c0", "node").withColumnRenamed("_c1", "url")
transition_df = spark.read.option("delimiter","\t").csv(f'{path}transition').withColumnRenamed("_c0", "node").withColumnRenamed("_c1", "neighbor")
N = keyvalue_df.count()
keyvalue = keyvalue_df.rdd.map(lambda x: (x.node, 1/N))
print("Top 10 keyValue")
display(keyvalue.take(10))
print("Top 10 transitions")
display(transition_df.take(10))

_1,_2
0,9.971750032158894e-07
1,9.971750032158894e-07
2,9.971750032158894e-07
3,9.971750032158894e-07
4,9.971750032158894e-07
5,9.971750032158894e-07
6,9.971750032158894e-07
7,9.971750032158894e-07
8,9.971750032158894e-07
9,9.971750032158894e-07


node,neighbor
0,1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444450446447448449450451452453454
40,455456457458459460461462463464465505146646746846947047147247347447547647747847948048148248348448548648748838748949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353423353525053653753853924124023554023654154254354454554654754854955055155255355455544255655744540447448449450451452453454
1,442255805595605614451447448449450451452453454
44,562563564565566567568569570571572573574575576577578471434795794705805815825835845855865875885895905917759259359459559659759859923360060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867944244544447448449450451452453454
35,26427680681682431255683684685686687688689690691692693694695696697698699700701380702703365704705706347071327087097107112523325023524771244271344535714447448449450451452453454
34,2668035715716690317177187197207217227237247256877263977277287297307317327333273432173535373673723523624023344244534738447448449450451452453454
33,739326807407157417427437448483745746747748749075036751311373483507521647537547557567577587593307607617627637647652876676776876977077177277348077477577677743177877978078121378278378478578678778878979079179279379479579620779779879980080180280380480580680780880981081181281381481581681781881982082182282382482551582682782882983083183283383483583683783883984084184253684384484584684784884985085185285385485535385623323523623824024785744285844533859447448449450451452453454
39,395730860861691862863864865866867868869870871872873874875876877878879880881882883884885726886887888889890891892893894895896897698898899900901902903904905906907908909910911434912913914269159169179189199209217889229239249254202233926235419686927928929647383401428402403409708930931932933934196423398935936364392937938939940427941201444219429439449459469479489499509519529539549554244259560957442445958447448449450451452453454
30,31814201189599609619621169631461471271313401329649659667559679681764119969691661259709719729734858628974371294374975976977423978979480980981982383983984985986987988235233989990991992993994995996997872998999100010011002100370610041005100670544244530447448449450451452453454
24,118910127301007100810091010101110121013101410151016101710181019102014010212352472402362331022747323710231024141102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090442109144524447448449450451452453454


### g.

**3.f.[6pt]** We want to avoid dangling pages when running the PageRank algorithm. <br>
To do so, convert the transition `RDD` into a data-frame. 
In this data-frame, remove all links where the destination webpage does not appear as one of the source webpages (in similar to 1.c.). <br>
Repeat the process until you get a sub-network where every node is a source-node, 
i.e, has a positive out-degree (you may need to repeat the process more than once). 

**Remark** In order to access a Spark data-frame via Spark.sql, you need to first declare it. For example, if your data-frame is called `transition_df`, 
add to your code the line: `transition_df.createOrReplaceTempView("transition_df")`

In [0]:
transition_df = transition_df.withColumn('neighbor', f.explode(f.split(f.col('neighbor'), ",")))
transition_df.createOrReplaceTempView('transition_df')

print(f"Transition dataframe count before dangling pages removal {transition_df.count()}")

none_dangling_nodes_query = """
WITH distinct_neighbors AS
(
                SELECT DISTINCT(neighbor)
                FROM            transition_df ), distinct_nodes AS
(
                SELECT DISTINCT(node)
                FROM            transition_df ), dangling_nodes AS
(
           SELECT     neighbor
           FROM       distinct_nodes d
           RIGHT JOIN distinct_neighbors n
           ON         d.node = n.neighbor
           WHERE      node IS NULL )
SELECT t.*
FROM   transition_df t LEFT anti
JOIN   dangling_nodes d
ON     t.neighbor = d.neighbor
"""
transition_df = sql(none_dangling_nodes_query)
print(f"Transition dataframe count after dangling pages removal {transition_df.count()}")
transition = transition_df.rdd.map(lambda x: (x.node, x.neighbor))

### h.

**3.g.[6pt]** Finally, run the PageRank algorithm on the wikipedia netowrk, with 10 iterations and $\beta=0.85$, and a uniform ranks vector initialization. <br>
Your implementation should be similar to the implementation for the flight delays dataset. 
Show the 20 `wikipedia` pages with the highest PageRank values you got (the `url` along with their PageRank scores)

In [0]:
BETA = 0.85
wikipedia_ranks = keyvalue
wikipedia_links = transition.groupByKey()
for i in range(10):
  contrib = wikipedia_links.join(wikipedia_ranks).flatMap(lambda x: conts(x[1][0], x[1][1]))
  wikipedia_ranks = contrib.reduceByKey(add).mapValues(lambda value: (value*BETA) + ((1-BETA)/N))
  
ranked_df = wikipedia_ranks.toDF(['node', 'rank'])
ranked_df.join(keyvalue_df, ['node']).orderBy(f.col("rank").desc()).limit(20).display()

node,rank,url
452,0.002772082006941,https://en.wikipedia.org/wiki/Help:Contents
454,0.002772082006941,https://en.wikipedia.org/wiki/Special:SpecialPages
448,0.002772082006941,https://en.wikipedia.org/wiki/Wikipedia:Contents
451,0.002772082006941,https://en.wikipedia.org/wiki/Wikipedia:Contact_us
453,0.002772082006941,https://en.wikipedia.org/wiki/Help:Introduction
450,0.002772082006941,https://en.wikipedia.org/wiki/Wikipedia:About
588,8.770557924449039e-05,https://en.wikipedia.org/wiki/United_States
22980,1.766401930517793e-05,https://en.wikipedia.org/wiki/Alma_mater
7370,1.2582461894878893e-05,https://en.wikipedia.org/wiki/British_Empire
27661,1.2181758179415528e-05,https://en.wikipedia.org/wiki/Taiwan
