In [1]:
import os
import pyspark
import re
import sys
from operator import add
from pyspark.sql import SparkSession

In [2]:
os.environ["SPARK_HOME"] = "/Users/snell/Software/spark-3.1.1-bin-hadoop2.7"
os.environ["PYSPARK_PYTHON"]="/Users/snell/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/Users/snell/anaconda3/bin/python"
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/"

In [3]:
spark = SparkSession.builder.appName("PageRankExample").getOrCreate()
spark.sparkContext.setLogLevel('WARN')

In [4]:
def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

In [5]:
def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

In [6]:
!cat pagerankinput.txt

n1 n2
n1 n4
n2 n3
n2 n5
n3 n4
n4 n5
n5 n1
n5 n2
n5 n3


In [7]:
inputfile = 'pagerankinput.txt'
totiterations = 2

In [8]:
# Loads in input file. It should be in format of:
#     URL         neighbor URL
#     URL         neighbor URL
#     URL         neighbor URL
#     ...
lines = spark.sparkContext.textFile(inputfile)
lines.collect()

['n1 n2',
 'n1 n4',
 'n2 n3',
 'n2 n5',
 'n3 n4',
 'n4 n5',
 'n5 n1',
 'n5 n2',
 'n5 n3']

In [9]:
# Group all URLs from input file 
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
links.collect()

[('n1', <pyspark.resultiterable.ResultIterable at 0x7fea408425f8>),
 ('n4', <pyspark.resultiterable.ResultIterable at 0x7fea408425c0>),
 ('n5', <pyspark.resultiterable.ResultIterable at 0x7fea408427f0>),
 ('n2', <pyspark.resultiterable.ResultIterable at 0x7fea408426d8>),
 ('n3', <pyspark.resultiterable.ResultIterable at 0x7fea40842780>)]

In [10]:
# Lets turn the ResultIterable into a list just so we can see it
linkslist = links.map(lambda x: (x[0], list(x[1])))
linkslist.collect()

[('n1', ['n4', 'n2']),
 ('n4', ['n5']),
 ('n5', ['n1', 'n2', 'n3']),
 ('n2', ['n3', 'n5']),
 ('n3', ['n4'])]

In [11]:
# Initialize page ranks. To follow the slides, intialize to 0.2
initialval = 0.2
ranks = links.map(lambda url_neighbors: (url_neighbors[0], initialval))
ranks.collect()

[('n1', 0.2), ('n4', 0.2), ('n5', 0.2), ('n2', 0.2), ('n3', 0.2)]

In [12]:
linkranks = linkslist.join(ranks)
linkranks.collect()

[('n1', (['n4', 'n2'], 0.2)),
 ('n5', (['n1', 'n2', 'n3'], 0.2)),
 ('n4', (['n5'], 0.2)),
 ('n2', (['n3', 'n5'], 0.2)),
 ('n3', (['n4'], 0.2))]

In [13]:
# Note: url_urls_rank is a (page, (list of urls, page rank)), so 
#         url_urls_rank[0] is the page
#         url_urls_rank[1][0] is the list of urls
#         url_urls_rank[1][1] is the page rank
contribs = links.join(ranks).flatMap(
        lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
contribs.collect()

[('n4', 0.1),
 ('n2', 0.1),
 ('n1', 0.06666666666666667),
 ('n2', 0.06666666666666667),
 ('n3', 0.06666666666666667),
 ('n5', 0.2),
 ('n3', 0.1),
 ('n5', 0.1),
 ('n4', 0.2)]

In [14]:
ranks = contribs.reduceByKey(add)
ranks.collect()

[('n1', 0.06666666666666667),
 ('n5', 0.30000000000000004),
 ('n4', 0.30000000000000004),
 ('n2', 0.16666666666666669),
 ('n3', 0.16666666666666669)]

In [15]:
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(totiterations):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks).flatMap(
        lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))

    # Re-calculates URL ranks based on neighbor contributions.
    # First way is like a naive Google - Second way is simple add
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15) 
    ranks = contribs.reduceByKey(add) # This way just follows the slides

In [16]:
ranks.collect()

[('n1', 0.1277777777777778),
 ('n3', 0.19444444444444448),
 ('n5', 0.2666666666666667),
 ('n4', 0.23333333333333336),
 ('n2', 0.1777777777777778)]

In [17]:
# Collect all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
    print("%s has rank: %s." % (link, rank))

n1 has rank: 0.1277777777777778.
n3 has rank: 0.19444444444444448.
n5 has rank: 0.2666666666666667.
n4 has rank: 0.23333333333333336.
n2 has rank: 0.1777777777777778.
