#### Name: `Jeff Scanlon`
#### AndrewID: `jscanlo2`

Below is the outline of pyspark code for calculating the pagerank of a graph expressed in the edge vector representation.  For this past of the assignment:

1. Complete the code below
2. Write doc string comments for all functions documenting what they do
3. Test your code on other graphs we've worked in previous assignments

I recommend reviewing the (i) the Excel spread sheet calculation we did for simulating the page rank calculation (ii) and slides on pagerank.  Ensure you under the algorithm well before starting on this exercise. 

Rather than just stating "Write PySpark code to calculate PageRank" I've provided some helper functions and sample output along the way to provide guidance.  To faciliate grading, do stay with these functions.

In [243]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://apache.claz.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [244]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [245]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext() 

ValueError: ignored

In [246]:
from google.colab import files
uploaded = files.upload()

Saving graph-1.txt to graph-1 (1).txt


# page rank

In [247]:
def parse_line(line):
  '''Splits each line on whitespace to create a comma-separated list'''
  ans = line.map(lambda l: l.split())
  return ans

In [248]:
def read_graph(fname):
  '''Reads in a file, applies the line parsing function to create a graph RDD'''
  line = sc.textFile(fname)
  p = parse_line(line)
  graph = p.map(lambda l: (l[0], l[1:]))
  return graph

In [249]:
read_graph('graph-1.txt').collect()

[('A', ['B', 'C']), ('B', ['C']), ('C', ['A']), ('D', ['C'])]

In [250]:
def init_ranks(graph):
  '''Takes the graph from read_graph function and initiates each page with an initial rank'''
  pages = graph.map(lambda i: (i[0], 0.25))
  return pages

In [251]:
g = read_graph('graph-1.txt')
init_ranks(g).collect()

[('A', 0.25), ('B', 0.25), ('C', 0.25), ('D', 0.25)]

In [252]:
# I'm not sure what exactly professor wanted us to do with this function or
# how he wanted us to use it with other functions.
# In the sample functions he provided, it doesn't indicate how this
# function is supposed to interact or be passed to other functions.

def calc_partials(x):
  '''Takes in a graph from read_graph function and calculates fraction of page rank
  each page will send to its neighbors'''
  num = x.flatMap(lambda x: [(x[1][i], 1/len(x[1])) for i in range(len(x[1]))])
  num1 = num.map(lambda x: (x[0], x[1]) if len(x) > 1 else x[0])
  app1 = x.map(lambda x: [(x[0],0) for i in x[0]])
  app2 = app1.map(lambda x: x[0])
  combined = num1.union(app2)
  return combined

x = read_graph('graph-1.txt')
calc_partials(x).collect()

[('B', 0.5),
 ('C', 0.5),
 ('C', 1.0),
 ('A', 1.0),
 ('C', 1.0),
 ('A', 0),
 ('B', 0),
 ('C', 0),
 ('D', 0)]

In [253]:
def calc_contribs(ranks, graph):
  '''Caculates the rank contribution each page receives from other pages (clicking)
  by multiplying initial rank by fraction (partial)'''
  num = g.map(lambda g: len(g[1]))
  ranks = r.map(lambda r: r[1])
  r1 = ranks.zip(num)
  contribs = r1.map(lambda c: c[0]/c[1])
  r2 = (g).zip(contribs)
  r3 = r2.map(lambda c: (c[0], c[1]))
  r4 = r3.map(lambda c: (c[0][1], c[1]))
  r5 = r4.flatMap(lambda c: ([(a, c[1]) for a in c[0]]))
  app1 = g.map(lambda x: [(x[0],0) for i in x[0]])
  app2 = app1.map(lambda x: x[0])
  combined = r5.union(app2)
  return combined

[('B', 0.125),
 ('C', 0.125),
 ('C', 0.25),
 ('A', 0.25),
 ('C', 0.25),
 ('A', 0),
 ('B', 0),
 ('C', 0),
 ('D', 0)]

Note that when calculating the contributions we have A, B, C, D also paired with 0?  Why?

In [254]:
c1 = calc_contribs(r, g)
c1.collect()

[('B', 0.125),
 ('C', 0.125),
 ('C', 0.25),
 ('A', 0.25),
 ('C', 0.25),
 ('A', 0),
 ('B', 0),
 ('C', 0),
 ('D', 0)]

In [284]:
 def calc_ranks(contribs, num_nodes, beta):
  '''calculates the total page rank (for clicking and jumping) for all pages, but
  only for one interation.'''
  #for each page, rank = (1-beta/num_nodes) + sum(contribs)*beta
  q = contribs.map(lambda c: (c[0], (c[1]*beta)))
  s = q.reduceByKey(lambda x,y: x+y)
  s3 = s.map(lambda c: (c[0], (c[1] + (1-beta)/num_nodes)))
  return s3

calc_ranks(c1, num_nodes=g.count(), beta=0.85).collect()

[('B', 0.14375), ('A', 0.25), ('C', 0.56875), ('D', 0.037500000000000006)]

In [None]:
calc_ranks(c1, num_nodes=g.count(), beta=0.85).collect()

[('C', 0.56875), ('A', 0.25), ('B', 0.14375), ('D', 0.037500000000000006)]

In [297]:
def pagerank(fname, beta=0.85, n=10):
  '''Uses the previous functions to interate through page rank calculations n times so
  that ranks stabilize and final page ranks can be determined.'''
  g = read_graph(fname)
  r = init_ranks(g)
  for i in range(n):
    c1 = calc_contribs(r, g)
    nodes = g.count()
    r = calc_ranks(c1, num_nodes = nodes, beta = 0.85)
  return r

pagerank('graph-1.txt', beta=0.85, n=10)

[('B', 0.14375), ('A', 0.25), ('C', 0.56875), ('D', 0.037500000000000006)]

In [300]:
sorted(pagerank('graph-1.txt', beta=0.85, n=10))

[('A', 0.25), ('B', 0.14375), ('C', 0.56875), ('D', 0.037500000000000006)]

[('A', 0.375054382302053),
 ('B', 0.1949370588413849),
 ('C', 0.3925085588565621),
 ('D', 0.037500000000000006)]