#DATASCI W261: Machine Learning at Scale 

* **Sayantan Satpati**
* **sayantan.satpati@ischool.berkeley.edu**
* **W261**
* **Week-13**
* **Assignment-13**
* **Date of Submission: 08-DEC-2015**

#  === Week 13 ASSIGNMENTS ===

##  Initialize Spark in Local

In [1]:
SPARK_HOME="/Users/ssatpati/0-DATASCIENCE/TOOLS/spark-1.5.1-bin-hadoop2.6"

In [2]:
import os
import sys
def init_spark():
    #Escape L for line numbers
    spark_home = os.environ['SPARK_HOME'] = SPARK_HOME
    if not spark_home:
        raise ValueError('SPARK_HOME enviroment variable is not set')
    sys.path.insert(0,os.path.join(spark_home,'python'))
    sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
    execfile(os.path.join(spark_home,'python/pyspark/shell.py'))

In [3]:
init_spark()

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Python version 2.7.10 (default, Oct 19 2015 18:31:17)
SparkContext available as sc, HiveContext available as sqlContext.


## HW13.1 Spark implementation of basic PageRank
---

```
===HW 13.1: Spark implementation of basic PageRank===

Write a basic Spark implementation of the iterative PageRank algorithm
that takes sparse adjacency lists as input.
Make sure that your implementation utilizes teleportation (1-damping/the number of nodes in the network), 
and further, distributes the mass of dangling nodes with each iteration
so that the output of each iteration is correctly normalized (sums to 1).
[NOTE: The PageRank algorithm assumes that a random surfer (walker), starting from a random web page,
chooses the next page to which it will move by clicking at random, with probability d,
one of the hyperlinks in the current page. This probability is represented by a so-called
‘damping factor’ d, where d ∈ (0, 1). Otherwise, with probability (1 − d), the surfer
jumps to any web page in the network. If a page is a dangling end, meaning it has no
outgoing hyperlinks, the random surfer selects an arbitrary web page from a uniform
distribution and “teleports” to that page]

In your Spark solution, please use broadcast variables and caching to make sure your code is as efficient as possible.

As you build your code, use the following test data to check you implementation:

s3://ucb-mids-mls-networks/PageRank-test.txt

Set the teleportation parameter  to 0.15 (1-d, where d, the damping factor is set to 0.85), and crosscheck
your work with the true result, displayed in the first image
in the Wikipedia article:

https://en.wikipedia.org/wiki/PageRank

and here for reference are the corresponding resulting PageRank probabilities:

A,0.033
B,0.384
C,0.343
D,0.039
E,0.081
F,0.039
G,0.016
H,0.016
I,0.016
J,0.016
K,0.016

Run this experiment locally first. Report the local configuration that you used and how long in minutes and seconds it takes to complete your job.

Repeat this experiment on AWS. Report the AWS cluster configuration that you used and how long in minutes and seconds it takes to complete your job. (in your notebook, cat the cluster config file)
```


In [33]:
%%writefile spark_13_1.py
import ast
import pprint
import sys
from pyspark import SparkContext
from pyspark import SparkConf

def u(s):
    return s.decode('utf-8')

def parse_line(line):
    tokens = line.split('\t')
    key = tokens[0]
    adj_list = ast.literal_eval(tokens[1])
    return (u(key), [u(k) for k,v in adj_list.iteritems()])

def preproc(t):
    l = [t]
    for x in t[1]:
        l.append((u(x),[]))
    return l

def contributions(t):
    l = [(t[0], 0)]
    w = t[1][1]
    adj_list = t[1][0]
    key = None
    if len(adj_list) == 0:
        l.append(('DANGLING', w))
    else:
        for n in adj_list:
            l.append((n, w/len(adj_list)))
    return l

def page_rank(t, n, dangling_mass, tp=0.15):
    w = t[1]
    w = (tp / n) + (1 - tp) * ((dangling_mass/n) + w)
    return (t[0], w)

if __name__ == '__main__':
    print 'Number of arguments:', len(sys.argv), 'arguments.'
    print 'Argument List:', str(sys.argv)
    
    if len(sys.argv) != 3:
        print 'Incorrect number of arguments passed, Aborting...'
        sys.exit(1)
        
    # Init Spark Context
    #conf = SparkConf()
    sc = SparkContext(appName="Page Rank")
    
    lines = sc.textFile(sys.argv[1]).map(parse_line)
    #print '\n### Original Dataset:'
    #pprint.pprint(lines.sortByKey().collect())

    links = lines.flatMap(preproc).reduceByKey(lambda x, y: x + y).cache()
    #print '\n### Pre-Processed Dataset (Links):'
    #pprint.pprint(links.collect())

    n = links.count()
    
    ranks = links.map(lambda x: (x[0], float(1)/n))
    #print '\n### Inital Ranks:'
    #pprint.pprint(ranks.collect())

    sum_partial_diff_PR = float('inf')
    cnt = 1

    #while sum_partial_diff_PR > .005:
    while cnt <= 30:
        contribs = links.join(ranks).flatMap(contributions).reduceByKey(lambda x, y: x + y).cache()
        dangling_mass = contribs.lookup('DANGLING')
        ranks_updated = contribs.filter(lambda x: x[0] != 'DANGLING').map(lambda x: page_rank(x, n, dangling_mass[0]))
        print '\n[Iteration: {0}] Dangling Mass: {1}'.format(cnt, dangling_mass[0])
        
        #print 'Sum of Ranks: {0}'.format(ranks_updated.values().reduce(lambda x, y: x + y))
        #sum_partial_diff_PR = ranks.join(ranks_updated).map(lambda x: abs(x[1][0] - x[1][1])).reduce(lambda x, y: x + y)
        #print 'Difference in Ranks: {0}'.format(sum_partial_diff_PR)
        ranks = ranks_updated
        cnt += 1

    ranks.map(lambda x: (x[0],round(x[1],3))).saveAsTextFile(sys.argv[2])
    
    sc.stop()


Overwriting spark_13_1.py


In [34]:
!chmod a+x spark_13_1.py

### Running it in local

In [35]:
!rm -rf output_13_1
!time $SPARK_HOME/bin/spark-submit --name "Page Rank" --master local[4] ./spark_13_1.py PageRank-test.txt output_13_1
!cat output_13_1/part-000* | sort
!end=$(date +%s)

Number of arguments: 3 arguments.
Argument List: ['/Users/ssatpati/0-DATASCIENCE/DEV/github/ml/w261/wk13/./spark_13_1.py', 'PageRank-test.txt', 'output_13_1']
2015-12-06 14:47:19.083 java[35243:22269489] Unable to load realm mapping info from SCDynamicStore

[Iteration: 1] Dangling Mass: 0.0909090909091

[Iteration: 2] Dangling Mass: 0.0592975206612

[Iteration: 3] Dangling Mass: 0.0379464062109

[Iteration: 4] Dangling Mass: 0.0640190695934

[Iteration: 5] Dangling Mass: 0.0375959647951

[Iteration: 6] Dangling Mass: 0.0386749363905

[Iteration: 7] Dangling Mass: 0.0341177257382

[Iteration: 8] Dangling Mass: 0.0346526855821

[Iteration: 9] Dangling Mass: 0.0332641479909

[Iteration: 10] Dangling Mass: 0.0332687068063

[Iteration: 11] Dangling Mass: 0.0329301017862

[Iteration: 12] Dangling Mass: 0.0329194443643

[Iteration: 13] Dangling Mass: 0.0328282893463
                                                                                                                               

### Running it in AWS

#### Following Steps have been done to run this on AWS:

1. Create Spark Cluster
1. Copy python file to Spark Master
2. Run the Program from Spark Master

```
aws --region us-west-2 ec2 create-key-pair --key-name w261_key --query 'KeyMaterial' --output text > w261_key.pem

aws emr create-cluster --name "spark1" --ami-version 3.8 --applications Name=Spark --ec2-attributes KeyName=w261_key --log-uri s3://ucb-mids-mls-sayantan-satpati/spark/logs --instance-type m3.xlarge --instance-count 3 --use-default-roles

scp -i /Users/ssatpati/0-DATASCIENCE/DEV/AWS/keys_w261/w261_key.pem spark_13_1.py hadoop@ec2-52-27-224-148.us-west-2.compute.amazonaws.com:/home/hadoop

/home/hadoop/spark/bin/spark-submit --master yarn-cluster /home/hadoop/spark_13_1.py s3n://ucb-mids-mls-networks/PageRank-test.txt s3n://ucb-mids-mls-sayantan-satpati/spark/hw13_1
```

In [36]:
!rm -rf aws_output_13_1
!aws s3 cp --recursive s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1 aws_output_13_1
!cat aws_output_13_1/part-* | sort

download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/part-00007 to aws_output_13_1/part-00007
download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/part-00000 to aws_output_13_1/part-00000
download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/part-00003 to aws_output_13_1/part-00003
download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/part-00008 to aws_output_13_1/part-00008
download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/part-00001 to aws_output_13_1/part-00001
download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/part-00006 to aws_output_13_1/part-00006
download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/_SUCCESS to aws_output_13_1/_SUCCESS
download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/part-00004 to aws_output_13_1/part-00004
download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/part-00002 to aws_output_13_1/part-00002
download: s3://ucb-mids-mls-sayantan-satpati/spark/hw13_1/part-00009 to aws_output_13_1/part-00009
download: s3:/

In [62]:
%%writefile spark_13_2.py
import ast
import pprint
import sys
from pyspark import SparkContext
from pyspark import SparkConf

def u(s):
    return s.decode('utf-8')

def parse_line(line):
    tokens = line.split('\t')
    key = tokens[0]
    adj_list = ast.literal_eval(tokens[1])
    return (u(key), [u(k) for k,v in adj_list.iteritems()])

def preproc(t):
    l = [t]
    for x in t[1]:
        l.append((u(x),[]))
    return l

def contributions(t):
    l = [(t[0], 0)]
    w = t[1][1]
    adj_list = t[1][0]
    key = None
    if len(adj_list) == 0:
        l.append(('DANGLING', w))
    else:
        for n in adj_list:
            l.append((n, w/len(adj_list)))
    return l

def page_rank(t, n, dangling_mass, tp=0.15):
    w = t[1]
    w = (tp / n) + (1 - tp) * ((dangling_mass/n) + w)
    return (t[0], w)

if __name__ == '__main__':
    sys.stderr.write('\nNumber of arguments: {0}'.format(len(sys.argv)))
    sys.stderr.write('\nArgument List: {0}'.format(sys.argv))
    
    if len(sys.argv) != 4:
        print 'Incorrect number of arguments passed, Aborting...'
        sys.exit(1)
        
    # Init Spark Context
    #conf = SparkConf()
    sc = SparkContext(appName="Page Rank")
    
    lines = sc.textFile(sys.argv[1]).map(parse_line)

    links = lines.flatMap(preproc).reduceByKey(lambda x, y: x + y).cache()
    n = links.count()
    
    ranks = links.map(lambda x: (x[0], float(1)/n))

    sum_partial_diff_PR = float('inf')
    cnt = 1

    #while sum_partial_diff_PR > .005:
    while cnt <= int(sys.argv[3]):
        contribs = links.join(ranks).flatMap(contributions).reduceByKey(lambda x, y: x + y).cache()
        dangling_mass = contribs.lookup('DANGLING')
        ranks_updated = contribs.filter(lambda x: x[0] != 'DANGLING').map(lambda x: page_rank(x, n, dangling_mass[0]))
        sys.stderr.write('\n[Iteration: {0}] Dangling Mass: {1}'.format(cnt, dangling_mass[0]))
        
        ranks = ranks_updated
        cnt += 1

    sc.parallelize(ranks.map(lambda x: (x[0],round(x[1],3))).takeOrdered(3, key=lambda x: -x[1])).saveAsTextFile(sys.argv[2])
    
    sc.stop()


Overwriting spark_13_2.py


In [63]:
!chmod u+x spark_13_2.py

In [64]:
out_dir = "output_13_2"
!rm -rf $out_dir
!time $SPARK_HOME/bin/spark-submit --name "Page Rank" --master local[4] ./spark_13_2.py \
                PageRank-test.txt $out_dir 10
!cat $out_dir/part-000* | sort
!end=$(date +%s)


Number of arguments: 4
Argument List: ['/Users/ssatpati/0-DATASCIENCE/DEV/github/ml/w261/wk13/./spark_13_2.py', 'PageRank-test.txt', 'output_13_2', '10']2015-12-06 18:45:48.110 java[55494:22389589] Unable to load realm mapping info from SCDynamicStore

[Iteration: 1] Dangling Mass: 0.0909090909091
[Iteration: 2] Dangling Mass: 0.0592975206612
[Iteration: 3] Dangling Mass: 0.0379464062109
[Iteration: 4] Dangling Mass: 0.0640190695934
[Iteration: 5] Dangling Mass: 0.0375959647951
[Iteration: 6] Dangling Mass: 0.0386749363905
[Iteration: 7] Dangling Mass: 0.0341177257382
[Iteration: 8] Dangling Mass: 0.0346526855821
[Iteration: 9] Dangling Mass: 0.0332641479909
[Iteration: 10] Dangling Mass: 0.0332687068063
real	0m12.018s
user	0m16.799s
sys	0m3.164s
(u'B', 0.363)
(u'C', 0.363)
(u'E', 0.081)
