Below is the outline of pyspark code for calculating the pagerank of a graph expressed in the edge vector representation.  For this past of the assignment:

1. Complete the code below
2. Write doc string comments for all functions documenting what they do
3. Test your code on other graphs we've worked in previous assignments

I recommend reviewing the (i) the Excel spread sheet calculation we did for simulating the page rank calculation (ii) and slides on pagerank.  Ensure you understand the algorithm well before starting on this exercise. 

Rather than just stating "Write PySpark code to calculate PageRank" I've provided some helper functions and sample output along the way to provide guidance.  To faciliate grading, do stay with these functions.

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext() 

# page rank

In [2]:
def parse_line(line):
    '''
    This function will be mapped to all lines in the input graph file. It
    will be applied to each line in order to parse out the node and all
    its neighbors, which it returns as a tuple.
    '''
    x, *nbrs = line.split()
    return (x, nbrs)

In [3]:
def read_graph(fname):
    '''
    Reads in a graph file in incidence vector form, and returns an rdd
    containing all graph data as (node, [neighbors]) tuples.
    '''
    lines = sc.textFile(fname)
    rdd = lines.map(parse_line)
    return rdd

In [4]:
read_graph('graph-1.txt').collect()

[('A', ['B', 'C']), ('B', ['C']), ('C', ['A']), ('D', ['C'])]

In [5]:
def init_ranks(graph):
    '''
    Calculates the initial page ranks of the nodes in this graph
    as 1 / number_of_nodes. Returns an rdd with these values.
    '''
    length = graph.count()                          # get size of graph
    start_rank = 1/length                           # calculate rank
    return graph.map(lambda x: (x[0], start_rank))  # return values

In [6]:
g = read_graph('graph-1.txt')
init_ranks(g).collect()

[('A', 0.25), ('B', 0.25), ('C', 0.25), ('D', 0.25)]

In [7]:
def calc_partials(x):
    '''
    Takes an element of form (rank, [neighbors]) and 
    calculates the partial sums for these neighbor nodes.
    '''
    rank, nbrs = x            # get rank and neighbors
    partial = rank/len(nbrs)  # calculate partial
    return [(n, partial) for n in nbrs]

In [8]:
def calc_contribs(ranks, graph):
    '''
    This function calculates the contributions for each node
    from all of its neighbors. Since we have a reference rdd
    with the graph, we don't need to pass neighbors on.
    '''
    # get rank and list of neighbors all in one place
    joined = ranks.join(graph).map(lambda x: x[1])
    
    # now calculate the contributions to each neighbor with helper fn
    contribs = joined.flatMap(calc_partials)
    
    # make zero values for nodes that might be missing
    dummy = graph.map(lambda x: (x[0], 0))
    
    return contribs.union(dummy)

In [9]:
g = read_graph('graph-1.txt')
r = init_ranks(g)

Note that when calculating the contributions we have A, B, C, D also paired with 0?  Why?  
--> So those nodes (like D) with no in-links don't get left behind!

In [10]:
c1 = calc_contribs(r, g)
c1.collect()

[('B', 0.125),
 ('C', 0.125),
 ('C', 0.25),
 ('A', 0.25),
 ('C', 0.25),
 ('A', 0),
 ('B', 0),
 ('C', 0),
 ('D', 0)]

In [11]:
def calc_ranks(contribs, num_nodes, beta):
    '''
    This function will calculate the new ranks for each node in
    contribs according to the page rank algorithm. Returns an
    RDD with one element per node, with its new rank.
    '''
    # compute first term 
    first_term = (1-beta)/num_nodes
    
    # compute second term for all nodes
    seconds = contribs.reduceByKey(lambda x,y: x+y)
    
    # now finish computation for all nodes
    new_ranks = seconds.map(lambda x: (x[0], (x[1]*beta + first_term)))
    return new_ranks

In [12]:
calc_ranks(c1, num_nodes=g.count(), beta=0.85).collect()

[('C', 0.56875), ('A', 0.25), ('B', 0.14375), ('D', 0.037500000000000006)]

In [13]:
def pagerank(fname, beta=0.85, n=10):
    '''
    This function puts together the whole pagerank process:
        1) Read the graph and initialize the ranks.
        2) Compile and calculate new ranks.
        3) Repeat step 2 for n iterations.
    '''
    # Step 1
    g = read_graph(fname)
    r = init_ranks(g)
    num_nodes = g.count()
    
    # Steps 2 and 3
    for _ in range(n):
        c = calc_contribs(r, g)
        r = calc_ranks(c, num_nodes, beta)
        
    return list(r.collect())

In [14]:
sorted(pagerank('graph-1.txt', beta=0.85, n=10))

[('A', 0.375054382302053),
 ('B', 0.1949370588413849),
 ('C', 0.3925085588565621),
 ('D', 0.037500000000000006)]

In [23]:
%%time
sorted(pagerank('graph-2.txt', beta=0.85, n=10))

Wall time: 8min 4s


[('A', 0.3552329235133619),
 ('B', 0.18087715033826585),
 ('C', 0.26671408227310545),
 ('D', 0.08133891194042742),
 ('E', 0.11583693193483957)]

In [36]:
%%time
sorted(pagerank('wikipedia-example.txt', beta=0.85, n=10))

Wall time: 14.5 s


[('A', 0.052272727272727276),
 ('B', 0.31757575757575757),
 ('C', 0.09863636363636365),
 ('D', 0.047121212121212126),
 ('E', 0.3304545454545455),
 ('F', 0.047121212121212126),
 ('G', 0.021363636363636366),
 ('H', 0.021363636363636366),
 ('I', 0.021363636363636366),
 ('J', 0.021363636363636366),
 ('K', 0.021363636363636366)]