## shikha Goel - shikhag

Below is the outline of pyspark code for calculating the pagerank of a graph expressed in the edge vector representation.  For this past of the assignment:

1. Complete the code below
2. Write doc string comments for all functions documenting what they do
3. Test your code on other graphs we've worked in previous assignments

I recommend reviewing the (i) the Excel spread sheet calculation we did for simulating the page rank calculation (ii) and slides on pagerank.  Ensure you under the algorithm well before starting on this exercise. 

Rather than just stating "Write PySpark code to calculate PageRank" I've provided some helper functions and sample output along the way to provide guidance.  To faciliate grading, do stay with these functions.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.3.4/spark-2.3.4-bin-hadoop2.7.tgz
!tar xf spark-2.3.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.4-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext() 

# page rank

In [0]:
def parse_line(line):
    '''
    Takes a single line of the vector incident represenation and 
    returns a tuple having the page with a list of its outlinks
    '''
    x,*y = line.strip().split()
    return (x,y)

In [0]:
def read_graph(fname):
    '''
    Takes the filename having the vector incident representation and 
    returns an RDD object having tuples of the page with its out_links
    '''
    lines = sc.textFile(fname)
    return lines.map(parse_line)

In [6]:
read_graph('graph-1.txt').collect()

[('A', ['B', 'C']), ('B', ['C']), ('C', ['A']), ('D', ['C'])]

In [0]:
def init_ranks(graph):
    '''
    assigns an initial rank equal of 1/(number of nodes) to each node or page and returns an RDD object 
    '''
    nodes = graph.count()
    return graph.map(lambda tup : (tup[0],1/nodes))

In [8]:
g = read_graph('graph-1.txt')
init_ranks(g).collect()

[('A', 0.25), ('B', 0.25), ('C', 0.25), ('D', 0.25)]

In [0]:
def calc_partials(x):
    '''
    calculates the partial rank of a page and returns an RDD list of (outlink,marginal rank) for each node that the page 
    contributes to.
    '''
    page,(rank,out_links) = x
    partial = [(node,rank/len(out_links)) for node in out_links] 
    return partial
        
#calc_partials(u.take(1)[0])    

In [0]:
def calc_contribs(ranks, graph):
    '''
    for each node, this function returns the partial rank of all the pages that have link into that node
    and contribute to the pagerank of that node. It returns a RDD list of all such (node,marginal rank of incoming page) 
    tuples. The RDD list also pairs each node with zero to create an output for nodes that 
    have no incoming link connected to them.
    '''
    j = ranks.join(graph)
    pair_with_zero = ranks.map(lambda x: (x[0],0.0))
    partials = j.flatMap(calc_partials)
    return partials.union(pair_with_zero)
        

In [0]:
g = read_graph('graph-1.txt')
r = init_ranks(g)

Note that when calculating the contributions we have A, B, C, D also paired with 0?  Why?

In [12]:
c1 = calc_contribs(r, g)
c1.collect()

[('B', 0.125),
 ('C', 0.125),
 ('C', 0.25),
 ('A', 0.25),
 ('C', 0.25),
 ('A', 0.0),
 ('B', 0.0),
 ('C', 0.0),
 ('D', 0.0)]

In [0]:
def calc_ranks(contribs, num_nodes, beta):
    '''
    It takes the RDD of partial rank contributions,beta and number of nodes
    as input, groups by page and then calculates the pagerank using the
    pagerank formula
    '''
    return contribs.groupByKey().map(lambda z : (z[0] ,(1-beta)/num_nodes + beta*sum(z[1])))
    

In [14]:
calc_ranks(c1, num_nodes=g.count(), beta=0.85).collect()

[('C', 0.56875), ('A', 0.25), ('B', 0.14375), ('D', 0.037500000000000006)]

In [0]:
def pagerank(fname, beta=0.85, n=10):
    '''
    This function calculates the pagerank of each page by iterating the pagerank formula n times. 
    It takes the vector incidence representation filename , beta and n as input adn returns
    a list of (page,pagerank) for all pages as output
    '''
    g = read_graph(fname).collect()
    r = init_ranks(sc.parallelize(g)).collect()
    for i in range(n):
        c = calc_contribs(sc.parallelize(r),sc.parallelize(g)).collect()
        r = calc_ranks(sc.parallelize(c),sc.parallelize(g).count(),beta).collect()
    return r
        
        
        

In [28]:
sorted(pagerank('graph-1.txt', beta=0.85, n=10))


[('A', 0.375054382302053),
 ('B', 0.1949370588413849),
 ('C', 0.3925085588565621),
 ('D', 0.037500000000000006)]

In [29]:
sorted(pagerank('graph-2.txt', beta=0.85, n=10))

[('A', 0.3552329235133619),
 ('B', 0.18087715033826585),
 ('C', 0.26671408227310545),
 ('D', 0.08133891194042742),
 ('E', 0.11583693193483957)]

In [31]:
sorted(pagerank('wikipedia-example.txt', beta=0.85, n=10))

[('A', 0.027646509310404344),
 ('B', 0.3036191818465732),
 ('C', 0.30972957233259435),
 ('D', 0.03297885857776607),
 ('E', 0.06821469112858616),
 ('F', 0.03297885857776607),
 ('G', 0.01363636363636364),
 ('H', 0.01363636363636364),
 ('I', 0.01363636363636364),
 ('J', 0.01363636363636364),
 ('K', 0.01363636363636364)]