In [2]:
import os
import sys
#Escape L for line numbers
spark_home = os.environ['SPARK_HOME'] = '/Users/leiyang/Downloads/spark-1.6.1-bin-hadoop2.6'
if not spark_home:
    raise ValueError('SPARK_HOME enviroment variable is not set')
sys.path.insert(0,os.path.join(spark_home,'python'))
sys.path.insert(0,os.path.join(spark_home,'python/lib/py4j-0.8.2.1-src.zip'))
execfile(os.path.join(spark_home,'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.9 (default, Dec 15 2014 10:37:34)
SparkContext available as sc, HiveContext available as sqlContext.


In [11]:
import numpy as np

dataRDD = sc.parallelize(np.random.random_sample(1000))   
data2X= dataRDD.map(lambda x: x*2)
dataGreaterThan1 = data2X.filter(lambda x: x > 1.0)
cachedRDD = dataGreaterThan1.cache()

In [12]:
cachedRDD.filter(lambda x: x<1).count()

0

In [13]:
data2X.take(10)

[0.26842824317966052,
 0.22885512655070683,
 0.9758237846106288,
 0.33859930537719629,
 0.0070283515932643148,
 1.9340366746533655,
 0.14436696744010025,
 0.16345393294606159,
 0.19683340264192073,
 0.45103274216591682]

###transformation test

In [None]:
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ('a', 34), ('b', 8)])
def add1(a, b): return a + [b]
def add2(a, b): return a + b
def test(a): return [a]
b = x.combineByKey(test, add1, add2).collect()
c = x.reduceByKey(lambda a,b: [a]+[b]).collect()

###load data from HDFS

In [3]:
y = sc.textFile('hdfs://localhost:9000/user/leiyang/PageRank-test.txt').cache()
y.collect()

[u"B\t{'C': 1}",
 u"C\t{'B': 1}",
 u"D\t{'A': 1, 'B': 1}",
 u"E\t{'D': 1, 'B': 1, 'F': 1}",
 u"F\t{'B': 1, 'E': 1}",
 u"G\t{'B': 1, 'E': 1}",
 u"H\t{'B': 1, 'E': 1}",
 u"I\t{'B': 1, 'E': 1}",
 u"J\t{'E': 1}",
 u"K\t{'E': 1}"]

###PageRanking

In [4]:
from time import time
from datetime import datetime

def initialize(line):          
    # parse line
    nid, adj = line.strip().split('\t', 1)     
    exec 'adj = %s' %adj
    # initialize node struct        
    node = {'a':adj.keys(), 'p':0}
    rankMass = 1.0/len(adj)    
    # emit pageRank mass and node    
    return [(m, rankMass) for m in node['a']] + [(nid.strip('"'), node)]

def accumulateMass(a, b):
    if isinstance(a, float) and isinstance(b, float):
        return a+b
    if isinstance(a, float) and not isinstance(b, float):
        b['p'] += a
        return b
    else:
        a['p'] += b
        return a
    
def getDangling(node):    
    global nDangling
    if isinstance(node[1], float):
        nDangling += 1
        return (node[0], {'a':[], 'p':node[1]})
    else:
        return node
    
def redistributeMass(node):
    node[1]['p'] = (p_dangling.value+node[1]['p'])*damping + alpha
    return node
    
def distributeMass(node):    
    global lossMass
    mass, adj = node[1]['p'], node[1]['a']
    node[1]['p'] = 0
    if len(adj) == 0:
        lossMass += mass
        return [node]
    else:
        rankMass = mass/len(adj)
        return [(x, rankMass) for x in adj]+[node]

def logTime():
    return str(datetime.now())


###Driver

In [5]:
from operator import add

# load the graph
graph_file = sc.textFile('hdfs://localhost:9000/user/leiyang/PageRank-test.txt')

nDangling = sc.accumulator(0)
lossMass = sc.accumulator(0.0)
damping = 0.85
alpha = 1 - damping
nTop, nIter = 100, 10
start = time()
print '%s: start PageRank initialization ...' %(logTime())
graph = graph_file.flatMap(initialize).reduceByKey(accumulateMass).map(getDangling).cache()
# get graph size
G = graph.count()
# broadcast dangling mass for redistribution
p_dangling = sc.broadcast(1.0*nDangling.value/G)
graph = graph.map(redistributeMass)
print '%s: initialization completed, dangling node(s): %d, total nodes: %d' %(logTime(), nDangling.value, G)

for i in range(nIter-1):
    print '%s: running iteration %d ...' %(logTime(), i+2)
    lossMass.value = 0.0
    graph = graph.flatMap(distributeMass).reduceByKey(accumulateMass).cache() #checkpoint()?
    # need to call an action here in order to have loss mass
    graph.count()        
    print '%s: redistributing loss mass: %.4f' %(logTime(), lossMass.value)
    p_dangling = sc.broadcast(lossMass.value/G)
    graph = graph.map(redistributeMass)

totalMass = graph.aggregate(0, (lambda x, y: y[1]['p'] + x), (lambda x, y: x+y))
print '%s: normalized weight of the graph: %.4f' %(logTime(), totalMass/G)
print '%s: PageRanking completed in %.2f minutes, top %d are:' %(logTime(), (time()-start)/60.0, nTop)
print graph.map(lambda n:(n[0],n[1]['p']/G)).sortBy(lambda n: n[1], ascending=False).take(nTop)


2016-04-15 21:48:28.606817: start PageRank initialization ...
2016-04-15 21:48:29.283494: initialization completed, dangling node(s): 1, total nodes: 11
2016-04-15 21:48:29.284133: running iteration 2 ...
2016-04-15 21:48:29.381162: redistributing loss mass: 0.6523
2016-04-15 21:48:29.386752: running iteration 3 ...
2016-04-15 21:48:29.470348: redistributing loss mass: 0.4174
2016-04-15 21:48:29.475617: running iteration 4 ...
2016-04-15 21:48:29.557635: redistributing loss mass: 0.7042
2016-04-15 21:48:29.562555: running iteration 5 ...
2016-04-15 21:48:29.647760: redistributing loss mass: 0.4136
2016-04-15 21:48:29.653744: running iteration 6 ...
2016-04-15 21:48:29.734994: redistributing loss mass: 0.4254
2016-04-15 21:48:29.741772: running iteration 7 ...
2016-04-15 21:48:29.823619: redistributing loss mass: 0.3753
2016-04-15 21:48:29.828607: running iteration 8 ...
2016-04-15 21:48:29.918053: redistributing loss mass: 0.3812
2016-04-15 21:48:29.924970: running iteration 9 ...
2016

In [178]:
seqOp = (lambda x, y: y[1]['p'] + x)
comOp = (lambda x, y: x+y)
graph.aggregate(0, (lambda x, y: y[1]['p'] + x), (lambda x, y: x+y))

10.999999999999998

In [180]:
#sum(graph.map(lambda n:n[1]['p']).collect())
graph.map(lambda n:(n[0], n[1]['p']/G)).collect()

[('A', 0.03293010178620472),
 ('C', 0.36288372803871793),
 (u'E', 0.08114525762548769),
 (u'G', 0.016207127344124005),
 (u'I', 0.016207127344124005),
 (u'K', 0.016207127344124005),
 (u'H', 0.016207127344124005),
 (u'J', 0.016207127344124005),
 ('B', 0.3632359489889102),
 ('D', 0.03938466342002967),
 ('F', 0.03938466342002967)]

In [8]:
#nodes.sortBy(lambda l: l[1][0], ascending=False).take(10)
print sc.textFile('file:///Users/leiyang/Downloads/toy_index.txt').collect()

[u"'Node_a\tA\t2\t0", u"'Node_b\tB\t1\t0", u"'Node_c\tC\t1\t0", u"'Node_d\tD\t1\t0", u"'Node_e\tE\t2\t0", u"'Node_f\tF\t1\t0", u"'Node_g\tG\t1\t0", u"'Node_h\tH\t1\t0", u"'Node_i\tI\t1\t0", u"'Node_j\tJ\t1\t0", u"'Node_k\tK\t1\t0", u"'crap_k\tp\t1\t0"]


###Invoke Spark

In [None]:
!spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
--master 'spark://host:7077' \
--deploy-mode 'client' \
--name 'PageRank' \
--py-files PageRank.py \
--executor-memory '512m' \
--driver-memory '512m' \
PageRankDriver.py 


###stop spark context

In [1]:
#sc.stop()
#!/usr/local/Cellar/hadoop/2*/sbin/stop-dfs.sh
!/usr/local/Cellar/hadoop/2*/sbin/start-dfs.sh

Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/Cellar/hadoop/2.7.1/libexec/logs/hadoop-leiyang-namenode-Leis-MacBook-Pro.local.out
localhost: starting datanode, logging to /usr/local/Cellar/hadoop/2.7.1/libexec/logs/hadoop-leiyang-datanode-Leis-MacBook-Pro.local.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/Cellar/hadoop/2.7.1/libexec/logs/hadoop-leiyang-secondarynamenode-Leis-MacBook-Pro.local.out
