In [1]:
#!pip install pyspark

# COS 598 Homework 3 - Gregory Roberts

In [117]:
import pandas as pd
from pyspark import SparkConf, SparkContext
import pyspark.sql as psql
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
import pyspark.sql.types as T
from operator import add

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

# Task 2
-----

In [137]:
conf = SparkConf().setMaster("local").setAppName("PageRank")
sc = SparkContext.getOrCreate(conf=conf)
sqlc = SQLContext.getOrCreate(sc)

In [4]:
# Read file into Spark RDD
rdd = sc.textFile("/content/drive/MyDrive/link_data.txt")
# Print RDD
print(rdd.collect())

['0 2', '2 0', '1 2', '1 3', '3 2']


In [5]:
# Reduce to X instances and map Y's to X
linksRDD = rdd.map(lambda x: tuple(x.split(" "))).map(lambda x: (x[0], [x[1]])).reduceByKey(lambda x, y: x+y)
# Print RDD
print(linksRDD.collect())

[('0', ['2']), ('2', ['0']), ('1', ['2', '3']), ('3', ['2'])]


In [6]:
# 1) Initialize the page rank of every node as 1.

ranksRDD = linksRDD.map(lambda x: (x[0], 1.0))
# Print RDD
print(ranksRDD.collect())

[('0', 1.0), ('2', 1.0), ('1', 1.0), ('3', 1.0)]


In [7]:
# 2) During each iteration, let vertex v contribute rank(v)/|neighbors(v)| to its neighboring vertices, 
#    where rank(v) is the current page rank of vertex v and |neighbors(v)| denotes the number of vertices 
#    to which vertex v links (i.e., the number of outgoing edges of vertex v).

# This function takes elements from the joined datasets (linksRDD and ranksRDD)
# and computes the contribution to each outgoing link based on the current rank.
def computeContribs(node_rank):
    # Separates links and ranks
    _, (links, rank) = node_rank
    # Counts number of links
    nb_links = len(links)
    # Loop through links
    for link in links:
        # For each link divide rank by number of links
        yield link, rank / nb_links

In [8]:
# 4) Repeat steps 2 and 3 k times (you may set k = 10).

for iteration in range(10):
    # Compute contributions of each node where it links to
    contribs = linksRDD.join(ranksRDD).flatMap(computeContribs)
    # Use a full outer join to make sure, that not well connected nodes aren't dropped
    contribs = linksRDD.fullOuterJoin(contribs).mapValues(lambda x : x[1] or 0.0)
    # Sum up all contributions per link
    ranksRDD = contribs.reduceByKey(add)
    # Re-calculate ranks
    # 3) Set each vertex’s rank to 0.15 + 0.85 × (contributions from all vertices that have 
    #    edges pointing to the current vertex), where the contributions are computed in step 2.
    ranksRDD = ranksRDD.mapValues(lambda rank: rank * 0.85 + 0.15)

In [9]:
# 5) In the end, divide the page rank of every vertex by the total number of 
#    vertices we have in the input graph.

# Get the count of the original vertices
N = rdd.count()
# Divide each rank by the count
ranksRDD2 = ranksRDD.mapValues(lambda rank: rank / N)

In [48]:
# Your implementation should output a text file, where each line contains a vertex and its 
# page rank. With the test input text file shown above, your output text file should look like:
#    0 <page rank of 0>
#    1 <page rank of 1>
#    2 <page rank of 2>
#    3 <page rank of 3>

# Loop through RDD by sorted link and rank
for (link, rank) in sorted(ranksRDD2.collect()):
    # Print sorted link and rank
    print("%s %s" % (link, rank))

0 0.36844926958806634
1 0.03
2 0.35880073041193344
3 0.042749999999999996


# Task 3
-----

In [59]:
# (a) Create a Spark data frame from the RDD containing the page ranks of the vertices 0, 1, 2, and 3 computed in Task 1.
df = ranksRDD2.toDF

In [64]:
# assuming the spark environemnt is set and sc is spark.sparkContext 
sqlContext = SQLContext(sc)
# Create dataframe from RDD
df = sqlContext.createDataFrame(ranksRDD2)
df.createOrReplaceTempView("ranksRDD2")

In [65]:
# df Type is PySpark SQL dataframe
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [69]:
# Show entire PySpark SQL dataframe
print(df.show())

+---+--------------------+
| _1|                  _2|
+---+--------------------+
|  2| 0.35880073041193344|
|  1|                0.03|
|  3|0.042749999999999996|
|  0| 0.36844926958806634|
+---+--------------------+

None


In [140]:
sqlc.sql("SELECT MAX(_2) as maxval FROM ranksRDD2").first().asDict()['maxval']

0.36844926958806634

In [114]:
# (b) Write a Spark SQL query to find the page rank of vertex 2. Print it to the screen.

# Filter column _1 equal to 2, and select only column 2
print(df.filter(df._1 == 2).select("_2").show())
sqlc.sql("SELECT MAX(_2) as maxval FROM ranksRDD2").first().asDict()['maxval']

+-------------------+
|                 _2|
+-------------------+
|0.35880073041193344|
+-------------------+

None


In [None]:
# (c) Write a Spark SQL query to find the vertex with the largest page rank. Print both the vertex ID and its page rank.

max_high = df.select(F.max(F.col('_2'))).collect()[0]['max(_2)']
df.filter(F.col('_2') == max_high).orderBy(F.col('_1').desc()).show()

In [139]:
df.registerTempTable("df_table")
sqlc.sql("SELECT MAX(_2) as maxval FROM df_table").first().asDict()['maxval']

0.36844926958806634