In [1]:
!hdfs dfs -cat /user/root/01_PageRank_data.csv

a,c
a,d
b,a
c,a
d,b
d,c


In [2]:
import findspark
findspark.init()

In [3]:
import pyspark
sc = pyspark.SparkContext(appName='PageRank')

In [4]:
rdd_1 = sc.textFile("hdfs:///user/root/01_PageRank_data.csv").map(lambda x: x.split(","))
rdd_1.collect()

[['a', 'c'], ['a', 'd'], ['b', 'a'], ['c', 'a'], ['d', 'b'], ['d', 'c']]

In [5]:
rdd_2 = rdd_1.map(lambda x: (x[0], x[1]))
rdd_2.collect()

[('a', 'c'), ('a', 'd'), ('b', 'a'), ('c', 'a'), ('d', 'b'), ('d', 'c')]

In [6]:
rdd_3 = rdd_2.groupByKey()
rdd_3.collect()

[('c', <pyspark.resultiterable.ResultIterable at 0x7fd87c1ed9e8>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x7fd8746022e8>),
 ('d', <pyspark.resultiterable.ResultIterable at 0x7fd874602358>),
 ('a', <pyspark.resultiterable.ResultIterable at 0x7fd874602320>)]

In [7]:
rdd_3.mapValues(list).collect()

[('c', ['a']), ('b', ['a']), ('d', ['b', 'c']), ('a', ['c', 'd'])]

In [8]:
# 1 (Before iteration) Initialize each page's rank to 1.0
rdd_4 = rdd_3.mapValues(lambda x: (x,1.0))
rdd_4.collect()

[('c', (<pyspark.resultiterable.ResultIterable at 0x7fd87c03f320>, 1.0)),
 ('b', (<pyspark.resultiterable.ResultIterable at 0x7fd87c03a4a8>, 1.0)),
 ('d', (<pyspark.resultiterable.ResultIterable at 0x7fd87c03a240>, 1.0)),
 ('a', (<pyspark.resultiterable.ResultIterable at 0x7fd87c03a470>, 1.0))]

In [9]:
rdd_4.mapValues(lambda x: (list(x[0]), x[1])).collect()

[('c', (['a'], 1.0)),
 ('b', (['a'], 1.0)),
 ('d', (['b', 'c'], 1.0)),
 ('a', (['c', 'd'], 1.0))]

In [10]:
# 2 On iteration I, page P send a contribution [rank(P)/numNeighbors(P)] to it's neighbors
def f(neighbors, rank):
    for neighbor in neighbors:
        new_rank = rank / len(neighbors)
        yield (neighbor, new_rank)

In [11]:
rdd_5 = rdd_4.flatMapValues(lambda x: f(x[0], x[1]))
rdd_5.collect()

[('c', ('a', 1.0)),
 ('b', ('a', 1.0)),
 ('d', ('b', 0.5)),
 ('d', ('c', 0.5)),
 ('a', ('c', 0.5)),
 ('a', ('d', 0.5))]

In [12]:
rdd_6 = rdd_5.map(lambda x: (x[1][0], x[1][1]))
rdd_6.collect()

[('a', 1.0), ('a', 1.0), ('b', 0.5), ('c', 0.5), ('c', 0.5), ('d', 0.5)]

In [13]:
from operator import add

In [14]:
rdd_7 = rdd_6.reduceByKey(add)
rdd_7.collect()

[('c', 1.0), ('b', 0.5), ('d', 0.5), ('a', 2.0)]

In [15]:
# 3 On iteration I, page P has rank as 0.15 + 0.85 * contribution
rdd_8 = rdd_7.mapValues(lambda v: 0.15 + 0.85 * v)
rdd_8.collect()

[('c', 1.0), ('b', 0.575), ('d', 0.575), ('a', 1.8499999999999999)]

In [16]:
sc.stop()