# PageRank
Realize the famous PageRank algorithm in a distributed system. This is very important when dealing with a large number pages that cannot be fitted in the memory or even hard drive of a single computer. Here we use the iterative approach instead of direct SVD approach. Initially all pages have weight 1 (or 1 / number of pages). To avoid deadends in the network, there is a probability of `beta` to jump to a random page with a uniform probability distribution (or any distribution specified in the initial pagerank input).

In [1]:
from pyspark import SparkContext, SparkConf
sc = SparkContext()

In [2]:
# Read transition matrix
tm_file_RDD = sc.textFile('PageRank/transitionsmall.txt')
tm_file_RDD.collect()

[u'a\tb,c,d', u'b\ta,d', u'c\ta', u'd\tb,c']

In [4]:
def create_tm_pairs(line):
    start, ends = line.strip().split('\t')
    ends = ends.split(',')
    weight = 1.0 / len(ends)
    return [(start, (end, weight)) for end in ends]

tm_RDD = tm_file_RDD.flatMap(create_tm_pairs)
tm_RDD.collect()

[(u'a', (u'b', 0.3333333333333333)),
 (u'a', (u'c', 0.3333333333333333)),
 (u'a', (u'd', 0.3333333333333333)),
 (u'b', (u'a', 0.5)),
 (u'b', (u'd', 0.5)),
 (u'c', (u'a', 1.0)),
 (u'd', (u'b', 0.5)),
 (u'd', (u'c', 0.5))]

In [10]:
iterations = 40
beta = 0.15    # 1 - dumping factor

def element_mult(pair):
    key, values = pair
    start_weight = 0.0
    transition = {}
    for value in values:
        if isinstance(value, tuple):
            transition[value[0]] = value[1]
        else:
            start_weight = float(value)
    return [(dist, start_weight * rate) for dist, rate in transition.iteritems()]

def pr_multiply(start_pr, tm, beta, uniform_pr):
    mat = start_pr.union(tm)
    mat_mult = mat.groupByKey().flatMap(element_mult).reduceByKey(lambda x, y : x+y)
    return mat_mult.map(lambda x : (x[0], x[1] * (1-beta))).union(
                    uniform_pr.map(lambda x : (x[0], x[1] * beta))).reduceByKey(lambda x, y : x+y)

def pr_split_line(line):
    page, rank = line.strip().split('\t')
    rank = float(rank)
    return (page, rank)
    
pr_RDDs = [None] * (iterations + 1)
pr_RDDs[0] = sc.textFile('PageRank/prsmall.txt').map(pr_split_line)
for i in xrange(iterations):
    pr_RDDs[i+1] = pr_multiply(pr_RDDs[i], tm_RDD, beta, pr_RDDs[0])
pr_RDDs[iterations].collect()

[(u'd', 0.22514619883040934),
 (u'c', 0.22514619883040934),
 (u'b', 0.22514619883040934),
 (u'a', 0.3245614035087717)]

In [11]:
# save file
pr_RDDs[iterations].saveAsTextFile("PageRank/output")