In [1]:
import os
import sys
spark_path = os.environ['SPARK_HOME']
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.9-src.zip")

import findspark
findspark.init()
import pyspark

In [2]:
number_cores = 6
memory_gb = 16
conf = (pyspark.SparkConf().setMaster('local[{}]'.format(number_cores)).set('spark.driver.memory', '{}g'.format(memory_gb)))
sc = pyspark.SparkContext(conf=conf)

In [3]:
!dir /users/trush/CSC496/PageRank

data  Lab2  spark-3.ipynb  spark-4.ipynb


In [4]:
! cat /users/trush/CSC496/PageRank/data/small_graph.dat

y y
y a
a y
a m
m a

In [5]:
graph_data = sc.textFile("/users/trush/CSC496/PageRank/data/small_graph.dat")
graph_data.take(10)

['y y', 'y a', 'a y', 'a m', 'm a']

In [6]:
# Incoming and outgoing links in a page
links = graph_data.map(lambda line: (line.split(" ")[0], line.split(" ")[1])) \
        .groupByKey() \
        .mapValues(list)
links.take(10)

[('y', ['y', 'a']), ('a', ['y', 'm']), ('m', ['a'])]

In [7]:
N = links.count()
ranks = links.map(lambda line: (line[0], 1/N))
ranks.take(N)

[('y', 0.3333333333333333),
 ('a', 0.3333333333333333),
 ('m', 0.3333333333333333)]

In [8]:
votes = ranks.join(links)
votes.take(N)

[('y', (0.3333333333333333, ['y', 'a'])),
 ('a', (0.3333333333333333, ['y', 'm'])),
 ('m', (0.3333333333333333, ['a']))]

In [9]:
def calculateVotes(t):
    res = []
    for item in t[1][1]:
        count = len(t[1][1])
        res.append((item, t[1][0]/count))
    return res
# Weight of votes from original source
calculateVotes(('y', (0.3333333333333333, ['y', 'a'])))

[('y', 0.16666666666666666), ('a', 0.16666666666666666)]

In [10]:
votes = ranks.join(links) \
        .flatMap(calculateVotes)
votes.collect()

[('y', 0.16666666666666666),
 ('a', 0.16666666666666666),
 ('y', 0.16666666666666666),
 ('m', 0.16666666666666666),
 ('a', 0.3333333333333333)]

In [11]:
ranks.collect()

[('y', 0.3333333333333333),
 ('a', 0.3333333333333333),
 ('m', 0.3333333333333333)]

In [12]:
ranks = votes.reduceByKey(lambda x, y: x + y)
ranks.collect()

[('y', 0.3333333333333333), ('a', 0.5), ('m', 0.16666666666666666)]

In [13]:
%%time
for i in range(10):
    votes = ranks.join(links) \
        .flatMap(calculateVotes)
    ranks = votes.reduceByKey(lambda x, y: x + y)
    print(ranks.collect())

[('y', 0.41666666666666663), ('a', 0.3333333333333333), ('m', 0.25)]
[('y', 0.375), ('a', 0.4583333333333333), ('m', 0.16666666666666666)]
[('y', 0.41666666666666663), ('m', 0.22916666666666666), ('a', 0.35416666666666663)]
[('y', 0.38541666666666663), ('a', 0.4375), ('m', 0.17708333333333331)]
[('m', 0.21875), ('y', 0.4114583333333333), ('a', 0.36979166666666663)]
[('y', 0.390625), ('a', 0.42447916666666663), ('m', 0.18489583333333331)]
[('m', 0.21223958333333331), ('y', 0.4075520833333333), ('a', 0.3802083333333333)]
[('m', 0.19010416666666666), ('y', 0.3938802083333333), ('a', 0.416015625)]
[('m', 0.2080078125), ('y', 0.40494791666666663), ('a', 0.3870442708333333)]
[('y', 0.39599609375), ('a', 0.4104817708333333), ('m', 0.19352213541666666)]
CPU times: user 565 ms, sys: 214 ms, total: 778 ms
Wall time: 10.5 s


In [14]:
%%time

N = links.count()
ranks = links.map(lambda line: (line[0], 1/N))
ranks.take(N)
sum = 1

while sum > 0.1:
    old_ranks = ranks
    votes = ranks.join(links) \
            .flatMap(calculateVotes)
    ranks = votes.reduceByKey(lambda x, y: x + y)
    errors = old_ranks.join(ranks).mapValues(lambda v: abs(v[0] - v[1]))
    sum = errors.values().sum()
    print(sum)
ranks.collect()

0.33333333333333337
0.33333333333333337
0.24999999999999997
0.20833333333333331
0.1666666666666667
0.13541666666666674
0.109375
0.08854166666666663
CPU times: user 829 ms, sys: 287 ms, total: 1.12 s
Wall time: 11.5 s


[('m', 0.21223958333333331),
 ('y', 0.4075520833333333),
 ('a', 0.3802083333333333)]