Below is the outline of pyspark code for calculating the pagerank of a graph expressed in the edge vector representation.  For this past of the assignment:

1. Complete the code below
2. Write doc string comments for all functions documenting what they do
3. Test your code on other graphs we've worked in previous assignments

I recommend reviewing the (i) the Excel spread sheet calculation we did for simulating the page rank calculation (ii) and slides on pagerank.  Ensure you under the algorithm well before starting on this exercise. 

Rather than just stating "Write PySpark code to calculate PageRank" I've provided some helper functions and sample output along the way to provide guidance.  To faciliate grading, do stay with these functions.

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext() 

# page rank

In [2]:
def parse_line(line):
    '''
    Input: String of each line in the input file
    Output: A tuple with the node, and its neighbours
    Function: This function takes each line input and converts the line into a tuple of each node 
    and its neighbours.
    '''
    s = line.split(' ')
    lst = s[1:]
    return (s[0],lst)

In [3]:
parse_line('A B C')

('A', ['B', 'C'])

In [4]:
def read_graph(fname):
    '''
    Input: This function takes the filename input graph
    Output: Returns an RDD of node and its neighbours. 
    Function: This function takes the file name input and returns the rdd of nodes and neighbours.
    '''
    lines = sc.textFile(fname)
    l = lines.map(lambda x: (parse_line(x)))
    return l

In [6]:
read_graph('graph-1.txt').collect()

[('A', ['B', 'C']), ('B', ['C']), ('C', ['A']), ('D', ['C'])]

In init_ranks you will need to use what is termed a "broadcast" variable. Do read online about these.  Following are some links 
[link 1](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-broadcast.html), [link 2](https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/when_to_use_broadcast_variable.html), [link 3](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#shared-variables)

In [7]:
def init_ranks(graph):
    '''
    Input: This function takes the graph input rdd
    Output: Returns a list of nodes and initial ranks for each node
    Function: This function assigns each node with a initial rank calculated as 1/length_of_unique_nodes
    '''
    bc = sc.broadcast(1/graph.count())
    lst = graph.map(lambda x:(x[0],bc.value))
    return lst

In [8]:
g = read_graph('graph-1.txt')
init_ranks(g).collect()

[('A', 0.25), ('B', 0.25), ('C', 0.25), ('D', 0.25)]

In [9]:
def calc_partials(x):
    '''
    Input: Takes a tuple with neighbours and init_ranks calculated above
    Output: Returns a tuple of each node and partial rank for every iteration
    Function: This function calculates the rank of each neighbour of each node
    '''
    neigh,rank = x
    lst = ((y,rank/len(neigh)) for y in neigh)
    return lst

In [10]:
def calc_contribs(ranks, graph):
    '''
    Input: Takes a ranks and graph as input
    Output: Returns a tuple of each node neighbours and rank for every iteration
    Function: This function calculates the contribution of each neighbour to the rank of each node
    '''
    g = graph.join(ranks)
    res = g.flatMap(lambda x:calc_partials(x[1]))
    keys = g.map(lambda x: (x[0],0.0))
    return res.union(keys)

In [11]:
g = read_graph('graph-1.txt')
r = init_ranks(g)

Note that when calculating the contributions we have A, B, C, D also paired with 0?  Why?

We are pairing A,B,C,D with 0 because to handle the condition/track of when these nodes does not have any linkage i.e if any of the node is not all linked to other nodes.

In [12]:
c1 = calc_contribs(r, g)
c1.collect()

[('A', 0.25),
 ('B', 0.125),
 ('C', 0.125),
 ('C', 0.25),
 ('C', 0.25),
 ('C', 0.0),
 ('A', 0.0),
 ('B', 0.0),
 ('D', 0.0)]

In [17]:
def calc_ranks(contribs, num_nodes, beta):
    '''
    Input: Takes a ranks and graph as input
    Output: Returns a tuple of each node neighbours and rank for every iteration
    Function: This function calculates the contribution of each neighbour to the rank of each node
    '''
    vals = contribs.groupByKey()
    s = vals.map(lambda x: (x[0],((1-beta)/num_nodes+ (beta*sum(x[1])))))
#     s = s.sortBy(lambda a: -a[1])
    return s

In [18]:
num_nodes=g.count()
calc_ranks(c1, num_nodes , beta=0.85).collect()

[('A', 0.25), ('D', 0.037500000000000006), ('C', 0.56875), ('B', 0.14375)]

In [19]:
def pagerank(fname, beta=0.85, n=10):
    '''
    Input: Takes the file name, beta and the number of iterations
    Output: Returns each node and its corresponding rank
    Function: This function calculates the page rank of each node by calling the above functions and assigns a page rank
    '''
    b= beta
    g = read_graph(fname)
    r = init_ranks(g)
    num = g.count()
    for i in range(0,n):
        c1 = calc_contribs(r, g)
        r = calc_ranks(c1, num, b)
    return r.collect()

In [16]:
sorted(pagerank('graph-1.txt', beta=0.85, n=10))

[('A', 0.375054382302053), ('B', 0.1949370588413849), ('C', 0.3925085588565621), ('D', 0.037500000000000006)]


In [17]:
sorted(pagerank('graph-2.txt', beta=0.85, n=10))

[('A', 0.3552329235133619), ('B', 0.18087715033826585), ('C', 0.26671408227310545), ('D', 0.08133891194042742), ('E', 0.11583693193483957)]


In [18]:
sorted(pagerank('wikipedia-example.txt', beta=0.85, n=10))

[('A', 0.027646509310404344), ('B', 0.3036191818465732), ('C', 0.30972957233259435), ('D', 0.03297885857776607), ('E', 0.06821469112858616), ('F', 0.03297885857776607), ('G', 0.01363636363636364), ('H', 0.01363636363636364), ('I', 0.01363636363636364), ('J', 0.01363636363636364), ('K', 0.01363636363636364)]
