<a href="https://colab.research.google.com/github/vu-topics-in-big-data-2022/examples/blob/main/example-spark/pagerank.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#This is the page rank example taken from spark examples

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [3]:
#install spark. we are using the one that uses hadoop as the underlying scheduler.
!wget -q https://downloads.apache.org/spark//spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!ls -l


total 223372
drwxr-xr-x  1 root root      4096 Mar 25 13:38 sample_data
drwxr-xr-x 13 1000 1000      4096 Feb 22 02:11 spark-3.1.1-bin-hadoop3.2
-rw-r--r--  1 root root 228721937 Feb 22 02:45 spark-3.1.1-bin-hadoop3.2.tgz


In [4]:
#Provides findspark.init() to make pyspark importable as a regular library.
os.environ["SPARK_HOME"] = "spark-3.1.1-bin-hadoop3.2"
!pip install -q findspark
import findspark
findspark.init()

In [5]:
#The entry point to using Spark SQL is an object called SparkSession. It initiates a Spark Application which all the code for that Session will run on
# to larn more see https://blog.knoldus.com/spark-why-should-we-use-sparksession/
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

# Page Rank example

In [6]:
!wget https://raw.githubusercontent.com/vu-topics-in-big-data-2022/examples/main/example-spark/data/pagerank_data.txt
!cat pagerank_data.txt


--2021-04-01 13:17:11--  https://raw.githubusercontent.com/vu-topics-in-big-data-2022/examples/main/example-spark/data/pagerank_data.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 24 [text/plain]
Saving to: ‘pagerank_data.txt’


2021-04-01 13:17:11 (1.13 MB/s) - ‘pagerank_data.txt’ saved [24/24]

1 2
1 3
1 4
2 1
3 1
4 1


In [7]:
lines = spark.read.text("pagerank_data.txt").rdd.map(lambda r: r[0])
lines.take(lines.count())

['1 2', '1 3', '1 4', '2 1', '3 1', '4 1']

In [8]:
import re
def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    print (parts)
    return parts[0], parts[1]

In [9]:
# Loads all URLs from input file and initialize their neighbors. 
# Mapping is transforming each RDD element using a function and returning a new RDD
links = lines.map(lambda x: parseNeighbors(x))
#so finally you get a set of rows, where each row is a tuple where the target is the second element in tuple
# and the src is the first element in the tuple. 
links.take(links.count())

[('1', '2'), ('1', '3'), ('1', '4'), ('2', '1'), ('3', '1'), ('4', '1')]

In [10]:
neighbors = lines.map(lambda urls: parseNeighbors(urls))
neighbors.take(links.count())

[('1', '2'), ('1', '3'), ('1', '4'), ('2', '1'), ('3', '1'), ('4', '1')]

In [11]:
#you can see the links as a table as well by printing it as a dataframe
neighbors.toDF().show()

+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  1|  3|
|  1|  4|
|  2|  1|
|  3|  1|
|  4|  1|
+---+---+



In [12]:
# we will try to group the elements by key 
linksdistinct=lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey()
#when you print it - you will see that the element's second column is a result iterable.

linksdistinct.take(linksdistinct.count())

[('1', <pyspark.resultiterable.ResultIterable at 0x7f52f6e460d0>),
 ('2', <pyspark.resultiterable.ResultIterable at 0x7f52f6e46610>),
 ('3', <pyspark.resultiterable.ResultIterable at 0x7f52f6e46490>),
 ('4', <pyspark.resultiterable.ResultIterable at 0x7f52f6e46c10>)]

In [13]:
#to see the iterable you can use the following conversion
def see_iterable(iterable):
    r = []
    for v1_iterable in iterable:
        for v2 in v1_iterable:
            r.append(v2)

    return tuple(r)

In [14]:
#lets see the first element now
print(linksdistinct.take(1)[0][0]) #this is the key
print(linksdistinct.take(1)[0][1]) #this is the iterable
print(linksdistinct.take(1)[0][0],see_iterable(linksdistinct.take(1)[0][1]))
#you can see that it contains elements 2, 3,4
print(linksdistinct.take(4)[1][0],see_iterable(linksdistinct.take(2)[1][1]))
print(linksdistinct.take(4)[2][0],see_iterable(linksdistinct.take(3)[2][1]))
print(linksdistinct.take(4)[3][0],see_iterable(linksdistinct.take(4)[3][1]))
print("compare this to. And see what group by did. It brough all pages that brough to one and collected them")
neighbors.toDF().show()

1
<pyspark.resultiterable.ResultIterable object at 0x7f52f6e4bc50>
1 ('2', '3', '4')
2 ('1',)
3 ('1',)
4 ('1',)
compare this to. And see what group by did. It brough all pages that brough to one and collected them
+---+---+
| _1| _2|
+---+---+
|  1|  2|
|  1|  3|
|  1|  4|
|  2|  1|
|  3|  1|
|  4|  1|
+---+---+



In [15]:
#so again ensure linkdistinct is initations
linksdistinct=lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey()
#now we set up a rank matrix using the the 0th element from list of rows: for example linksdistinct.take(4)[1][0] - take all rows, pick first row and pick 0th element, linksdistinct.take(4)[2][0] will take the 0th element from the list that is the second row
#thus the following operation will get us a RDD where the rows are 1,2,3,4 and 1.0
ranks = linksdistinct.map(lambda url_neighbors: (url_neighbors[0], 1.0))
 # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
print(ranks.collect())

[('1', 1.0), ('2', 1.0), ('3', 1.0), ('4', 1.0)]


In [16]:
#lets see the join operation. See how it aggregates by key
contribs = linksdistinct.join(ranks)
contribs.take(linksdistinct.count())

[('1', (<pyspark.resultiterable.ResultIterable at 0x7f52f6e5aa50>, 1.0)),
 ('4', (<pyspark.resultiterable.ResultIterable at 0x7f52f6e5a910>, 1.0)),
 ('2', (<pyspark.resultiterable.ResultIterable at 0x7f52f6e3a550>, 1.0)),
 ('3', (<pyspark.resultiterable.ResultIterable at 0x7f52f6e3a110>, 1.0))]

In [17]:
#again this is a pyspark iterable. So lets us the previous function.
for i in range(4):
  print(contribs.take(4)[i][0],see_iterable(contribs.take(4)[i][1][0])) #note that the value is a tuple of iterable and 1.0. hence there is one more index here in the value we pass to see_iterable.

1 ('2', '3', '4')
4 ('1',)
2 ('1',)
3 ('1',)


In [18]:
#On each iteration, have page p send a contribution of rank(p)/numNeighbors(p) to its neighbors (the pages it has links to).
#note if we process 1 ('2', '3', '4')
#then the rank of 1/3 is contributed towards the rank of 2,3 and 4
#hence if at start rank of 1 is 1.0
#then the output of this function will be
# 2,1/3
# 3,1/3
# 4,1/3
#note that this is only the first iteration
def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)    
    for url in urls:
        yield (url, rank/num_urls)



In [19]:
#change iterations number to run the algo for multiple times.
iterations=1
#reinit rank
ranks = linksdistinct.map(lambda url_neighbors: (url_neighbors[0], 1.0))

from operator import add
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(iterations):
        # Calculates URL contributions to the rank of other URLs.
        contribs = linksdistinct.join(ranks).flatMap(lambda url_urls_rank: computeContribs(url_urls_rank[1][0] #this first element is the list for example. ('2', '3', '4') 
                                                                                           , url_urls_rank[1][1])) #the second element is the rank - at the start it is 1.0
        #comment the show method if you run for lot of iterations
        contribs.toDF().show(truncate=False)

        # Re-calculates URL ranks based on neighbor contributions.
        #note that updating the rank by multiplying by 0.85 and adding 0.15 is a heuristic
        ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
        #comment the show method if you run for lot of iterations
        print("rank")
        ranks.toDF().show(truncate=False)


+---+------------------+
|_1 |_2                |
+---+------------------+
|2  |0.3333333333333333|
|3  |0.3333333333333333|
|4  |0.3333333333333333|
|1  |1.0               |
|1  |1.0               |
|1  |1.0               |
+---+------------------+

rank
+---+-------------------+
|_1 |_2                 |
+---+-------------------+
|4  |0.43333333333333335|
|1  |2.6999999999999997 |
|2  |0.43333333333333335|
|3  |0.43333333333333335|
+---+-------------------+



In [20]:
#let us now run it 20 times but without prints
iterations=20
#reinit rank
ranks = linksdistinct.map(lambda url_neighbors: (url_neighbors[0], 1.0))
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(iterations):
        # Calculates URL contributions to the rank of other URLs.
        contribs = linksdistinct.join(ranks).flatMap(lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
        ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)       
        

In [21]:
# Collects all URL ranks and dump them to console.
#see the lazy operation. it will take longer to execute this cell than the previous cell.
for (link, rank) in ranks.collect():
   print("%s has rank: %s." % (link, rank))


2 has rank: 0.7055659824943556.
1 has rank: 1.8833020525169324.
3 has rank: 0.7055659824943556.
4 has rank: 0.7055659824943556.
