PageRank Algorithin PySpark Implementation
Submitted by -
Kriti Sharma,
Parvati Balulmath,
Vishnu Charan Golugula


In [59]:
#Installing the desired packages and frameworks
# Run the below code in order
# !sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
!tar xf spark-3.3.3-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark

# spark-3.2.0-bin-hadoop3.2.tgz  # http://archive.apache.org/dist/spark/



In [60]:
# Importing Os environment
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

In [61]:
# importing python/pyspark modules to use them later in the functions,
#it is important to import modules to use there predined functions in the code
import findspark
findspark.init()
from pyspark import SparkContext
from google.colab import files
import numpy as np

In [62]:
# upload the input file
files.upload()

Saving nodes_input to nodes_input (2)


{'nodes_input (2)': b'n1 n2 n4\nn2 n3 n5\nn3 n4\nn4 n5\nn5 n1 n2 n3'}

In [64]:
# it initializes the spark environment
sc = SparkContext.getOrCreate()

rdd0 = sc.textFile("nodes_input")  # create rdd by using sc.textFile to read the content # the way to input the file may vary depening on the OS used
print(rdd0.collect())

['n1 n2 n4', 'n2 n3 n5', 'n3 n4', 'n4 n5', 'n5 n1 n2 n3']


In [65]:
# The below function is to parse the each line of the text file , it spilts by a space
def parseline(line):
    lineList = line.split(' ')
    return (lineList[0], lineList[1:]) # it returns in the form of (nodeId , itsneighbors)

In [66]:
linkRDD = rdd0.map(parseline) # using map function to create a key value pairs

In [67]:
# count the number of nodes in the rdd and grant the initial pageranks to each node
num = linkRDD.count()
init_rank = 1.0 / num

In [68]:
# use the map fuction to create pagerankRDD
pagerankRDD = linkRDD.map(lambda x: (x[0], init_rank))
prev_prRDD = pagerankRDD

In [69]:
# the below function is to update the pagerank value , adding the dangling node mass with it
def updatePagerank(pagerankRDD, dangling_node_m, num):
    delta = dangling_node_m / num
    updated_pagerankRDD = (
        pagerankRDD.mapValues(lambda pr: 0)  # setting PageRank to 0
        .union(pagerankRDD.join(linkRDD).flatMap(lambda x: [(end_node, x[1][0] / len(x[1][1])) for end_node in x[1][1]])).reduceByKey(lambda a, b: a + b).mapValues(lambda pr: pr + delta))
    return updated_pagerankRDD

In [70]:
# The below function calculates the overall Pagerank mass contributed by the dangling nodes in the graph
def dangling_nodemass(linkRDD, pagerankRDD):
    dangling_node_m = (
        linkRDD.filter(lambda x: len(x[1]) == 0).join(pagerankRDD).map(lambda x: x[1][1]).sum())
    return dangling_node_m

In [71]:
# The below is the main fucntion that call the each and every function to calcuate the final page rank vale and handle the dangling nodes mass
# initializing the values
max_loop_end_condition = 100
tolerance = 0.1
converge = float('inf')

for i in range(max_loop_end_condition):
    # dangling node mass
    dangling_node_m = dangling_nodemass(linkRDD, pagerankRDD)
    # updating pagerankRDD
    updated_pagerankRDD = updatePagerank(pagerankRDD, dangling_node_m, num)

    updated_convergence = (pagerankRDD.join(updated_pagerankRDD).mapValues(lambda x: np.abs(x[0] - x[1])).values().sum())

    pagerank_values = pagerankRDD.collect() # to print the prevpage rank values
    print(f"Iteration {i + 1}: Convergence = {updated_convergence}")
    print("--------------------------------------------------------")
    print("Previous PageRank:")
    print(prev_prRDD.collect())

    print("Current PageRank:")
    for n, pr in pagerank_values: # loop to print the current pagerank
        print(f"{n}: {pr}")

    # Check convergence
    if updated_convergence < tolerance: # if the
        converge = updated_convergence
        break

    prev_prRDD = pagerankRDD
    pagerankRDD = updated_pagerankRDD

pagerank_values = pagerankRDD.collect()
print("Final Pagerank:")
print(pagerank_values)

Iteration 1: Convergence = 0.4000000000000001
--------------------------------------------------------
Previous PageRank:
[('n1', 0.2), ('n2', 0.2), ('n3', 0.2), ('n4', 0.2), ('n5', 0.2)]
Current PageRank:
n1: 0.2
n2: 0.2
n3: 0.2
n4: 0.2
n5: 0.2
Iteration 2: Convergence = 0.2666666666666667
--------------------------------------------------------
Previous PageRank:
[('n1', 0.2), ('n2', 0.2), ('n3', 0.2), ('n4', 0.2), ('n5', 0.2)]
Current PageRank:
n4: 0.30000000000000004
n3: 0.16666666666666669
n1: 0.06666666666666667
n2: 0.16666666666666669
n5: 0.30000000000000004
Iteration 3: Convergence = 0.2333333333333334
--------------------------------------------------------
Previous PageRank:
[('n4', 0.30000000000000004), ('n3', 0.16666666666666669), ('n1', 0.06666666666666667), ('n2', 0.16666666666666669), ('n5', 0.30000000000000004)]
Current PageRank:
n3: 0.18333333333333335
n4: 0.2
n5: 0.3833333333333334
n2: 0.13333333333333336
n1: 0.10000000000000002
Iteration 4: Convergence = 0.1611111111