# MIDS UC Berkeley, Machine Learning at Scale DATSCI W261 ASSIGNMENT 13

## HW 13.1: Spark implementation of basic PageRank

Write a basic Spark implementation of the iterative PageRank algorithm
that takes sparse adjacency lists as input.
Make sure that your implementation utilizes teleportation (1-damping/the number 
of nodes in the network), 
and further, distributes the mass of dangling nodes with each iteration
so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), 
starting from a random web page,
chooses the next page to which it will move by clicking at random, with probability d,
one of the hyperlinks in the current page. This probability is represented by a so-called
‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer
jumps to any web page in the network. If a page is a dangling end, meaning it has no
outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform
distribution and “teleports” to that page]

In your Spark solution, please use broadcast variables and caching to make sure your code 
is as efficient as possible.


As you build your code, use the test data

s3://ucb-mids-mls-networks/PageRank-test.txt
Or under the Data Subfolder for HW7 on Dropbox with the same file name. 
(On Dropbox https://www.dropbox.com/sh/2c0k5adwz36lkcw/AAAAKsjQfF9uHfv-X9mCqr9wa?dl=0)

with teleportation parameter set to 0.15 (1-d, where d, the damping factor is set to 0.85), 
and crosscheck
your work with the true result, displayed in the first image
in the Wikipedia article:

https://en.wikipedia.org/wiki/PageRank

and here for reference are the corresponding PageRank probabilities:

<pre>
A,0.033
B,0.384
C,0.343
D,0.039
E,0.081
F,0.039
G,0.016
H,0.016
I,0.016
J,0.016
K,0.016
</pre>

Run this experiment locally first. Report the local configuration that you
 used and how long in minutes and seconds it takes to complete your job.

Repeat this experiment on AWS. Report the AWS cluster configuration that 
you used and how long in minutes and seconds it takes to complete your job. 
(in your notebook, cat the cluster config file)

In [1]:
import os
import sys
spark_home = os.environ['SPARK_HOME'] = 'C:\\spark-1.6.1-bin-hadoop2.6'
#spark_home = os.environ['SPARK_HOME'] = '/home/w205/spark15'

if not spark_home:
    raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python', 'lib', 'py4j-0.9-src.zip'))
#sys.path.insert(0,os.path.join(spark_home,'python', 'lib', 'py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python', 'pyspark', 'shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.11 (default, Jan 29 2016 14:26:21)
SparkContext available as sc, HiveContext available as sqlContext.


In [13]:
input_file = 'PageRank-test.txt'
iter_num = 10
tele_factor = 0.15

def parseInput(line):
    lineArr = line.split('\t')
    key_url = lineArr[0].strip()
    neighbors = eval(lineArr[1])
    return (key_url, neighbors)

def calcContribution(neighbors, rankVal, loss_mass_accumulator):
    if neighbors != None:
        link_total_weight = sum(neighbors.values())
        for link in neighbors:
            link_weight = neighbors[link]
            yield link, rankVal*1.0*link_weight/link_total_weight
    else:
        loss_mass_accumulator.add(rankVal)
        
def reNormalizeRank(iter_result, loss_mass, total_nodes, damping_factor):
    if iter_result == None: iter_result = 0.0
    return (iter_result+loss_mass)*(1-damping_factor)+damping_factor*1.0/total_nodes

# load the sparse input, and un-sparse it
adj_rdd = sc.textFile(input_file).map(lambda line: parseInput(line))
all_urls = adj_rdd.flatMap(lambda data: [(data[0], [data[1]])] + [(url, [None]) for url in data[1].keys()]) \
        .reduceByKey(lambda a, b: a + b).mapValues(lambda vals: max(vals))

total_nodes = all_urls.count()

# initialize ranks
ranks = all_urls.map(lambda (url,neighbors): (url,(neighbors, 1.0/total_nodes)))

for i in range(iter_num):
    print "iter %s"%i
    accum_loss_mass = sc.accumulator(0.0)
   
    iter_distribution = ranks \
            .flatMap(lambda data: calcContribution(data[1][0], data[1][1], accum_loss_mass)) \
            .reduceByKey(lambda a,b: a+b).cache()
    iter_distribution.first() # force RDD to evaluate
    loss_mass = accum_loss_mass.value/total_nodes
    
    ranks = all_urls.leftOuterJoin(iter_distribution) \
        .mapValues(lambda (neighbors, iter_result): (neighbors,reNormalizeRank(iter_result, loss_mass, total_nodes, tele_factor)))

# take a look at top 11
for result in ranks.takeOrdered(11, lambda (k, v): -v[1]):
    print "Node: %s | Rank: %s" % (result[0], result[1][1])

iter 0
iter 1
iter 2
iter 3
iter 4
iter 5
iter 6
iter 7
iter 8
iter 9
Node: B | Rank: 0.363235948989
Node: C | Rank: 0.362883728039
Node: E | Rank: 0.0811452576255
Node: D | Rank: 0.03938466342
Node: F | Rank: 0.03938466342
Node: A | Rank: 0.0329301017862
Node: I | Rank: 0.0162071273441
Node: H | Rank: 0.0162071273441
Node: K | Rank: 0.0162071273441
Node: G | Rank: 0.0162071273441
Node: J | Rank: 0.0162071273441
