In [1]:
import pyspark
from pyspark.context import SparkContext
from pyspark import SparkConf
conf = SparkConf()
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")

In [2]:
# Load the adjacency list file
AdjList1 = sc.textFile("02AdjacencyList.txt")
print(AdjList1.collect())

['1 2', '2 3 4', '3 4', '4 1 5', '5 3']


In [3]:
AdjList2 = AdjList1.map(lambda line:line.split(" ")) #First lambda function
AdjList2.collect()

[['1', '2'], ['2', '3', '4'], ['3', '4'], ['4', '1', '5'], ['5', '3']]

In [4]:
AdjList3=AdjList2.map(lambda line:(line[0],line[1:])) #Second Lambda function
AdjList3.persist()
print(AdjList3.collect())

[('1', ['2']), ('2', ['3', '4']), ('3', ['4']), ('4', ['1', '5']), ('5', ['3'])]


In [5]:
nNumOfNodes = AdjList3.count()
print("Total Number of nodes " + str(nNumOfNodes))

Total Number of nodes 5


In [6]:
print("Initialization")
PageRankValues = AdjList3.mapValues(lambda x : 1/nNumOfNodes) # third lambda function
print(PageRankValues.collect())

Initialization
[('1', 0.2), ('2', 0.2), ('3', 0.2), ('4', 0.2), ('5', 0.2)]


In [7]:
# Run 30 iterations
print("Run 30 Iterations")
for i in range(1, 30):
    print("Number of Iterations " + str(i))
    JoinRDD = AdjList3.join(PageRankValues)
    print("join results")
    print(JoinRDD.collect())
    contributions = JoinRDD.flatMap(lambda x:[(y,x[1][1]/len(x[1][0])) for y in x[1][0]]) # fourth lambda function
    print("contributions")
    print(contributions.collect())
    accumulations = contributions.reduceByKey(lambda x,y: x+y) # fifth lambda function
    print("accumulations")
    print(accumulations.collect())
    PageRankValues = accumulations.mapValues(lambda x : x*0.85 + 0.15/nNumOfNodes) #sixth lambda function
    print("PageRankValues")
    print(PageRankValues.collect()) 

Run 30 Iterations
Number of Iterations 1
join results
[('4', (['1', '5'], 0.2)), ('3', (['4'], 0.2)), ('1', (['2'], 0.2)), ('2', (['3', '4'], 0.2)), ('5', (['3'], 0.2))]
contributions
[('1', 0.1), ('5', 0.1), ('4', 0.2), ('2', 0.2), ('3', 0.1), ('4', 0.1), ('3', 0.2)]
accumulations
[('4', 0.30000000000000004), ('3', 0.30000000000000004), ('1', 0.1), ('5', 0.1), ('2', 0.2)]
PageRankValues
[('4', 0.28500000000000003), ('3', 0.28500000000000003), ('1', 0.115), ('5', 0.115), ('2', 0.2)]
Number of Iterations 2
join results
[('4', (['1', '5'], 0.28500000000000003)), ('3', (['4'], 0.28500000000000003)), ('2', (['3', '4'], 0.2)), ('1', (['2'], 0.115)), ('5', (['3'], 0.115))]
contributions
[('1', 0.14250000000000002), ('5', 0.14250000000000002), ('4', 0.28500000000000003), ('3', 0.1), ('4', 0.1), ('2', 0.115), ('3', 0.115)]
accumulations
[('4', 0.385), ('3', 0.21500000000000002), ('2', 0.115), ('1', 0.14250000000000002), ('5', 0.14250000000000002)]
PageRankValues
[('4', 0.35724999999999996), ('

[('5', (['3'], 0.15690875476694538)), ('4', (['1', '5'], 0.29716419189983956)), ('2', (['3', '4'], 0.1599719745453669)), ('3', (['4'], 0.2290463240209026)), ('1', (['2'], 0.15690875476694538))]
contributions
[('3', 0.15690875476694538), ('1', 0.14858209594991978), ('5', 0.14858209594991978), ('3', 0.07998598727268345), ('4', 0.07998598727268345), ('4', 0.2290463240209026), ('2', 0.15690875476694538)]
accumulations
[('5', 0.14858209594991978), ('4', 0.30903231129358605), ('2', 0.15690875476694538), ('3', 0.23689474203962885), ('1', 0.14858209594991978)]
PageRankValues
[('5', 0.15629478155743182), ('4', 0.29267746459954813), ('2', 0.16337244155190356), ('3', 0.2313605307336845), ('1', 0.15629478155743182)]
Number of Iterations 14
join results
[('2', (['3', '4'], 0.16337244155190356)), ('5', (['3'], 0.15629478155743182)), ('3', (['4'], 0.2313605307336845)), ('1', (['2'], 0.15629478155743182)), ('4', (['1', '5'], 0.29267746459954813))]
contributions
[('3', 0.08168622077595178), ('4', 0.081

[('4', (['1', '5'], 0.29533150586877155)), ('1', (['2'], 0.15553811832152997)), ('2', (['3', '4'], 0.16233478123781217)), ('3', (['4'], 0.2312574762503561)), ('5', (['3'], 0.15553811832152997))]
contributions
[('1', 0.14766575293438577), ('5', 0.14766575293438577), ('2', 0.15553811832152997), ('3', 0.08116739061890609), ('4', 0.08116739061890609), ('4', 0.2312574762503561), ('3', 0.15553811832152997)]
accumulations
[('4', 0.3124248668692622), ('1', 0.14766575293438577), ('2', 0.15553811832152997), ('3', 0.23670550894043607), ('5', 0.14766575293438577)]
PageRankValues
[('4', 0.29556113683887286), ('1', 0.1555158899942279), ('2', 0.16220740057330046), ('3', 0.23119968259937065), ('5', 0.1555158899942279)]
Number of Iterations 25
join results
[('3', (['4'], 0.23119968259937065)), ('1', (['2'], 0.1555158899942279)), ('4', (['1', '5'], 0.29556113683887286)), ('2', (['3', '4'], 0.16220740057330046)), ('5', (['3'], 0.1555158899942279))]
contributions
[('4', 0.23119968259937065), ('2', 0.15551

In [9]:
print("=== Final PageRankValues ===")
print(PageRankValues.collect())

=== Final PageRankValues ===
[('2', 0.16224457411113138), ('1', 0.15557437429733678), ('4', 0.2954233497371882), ('5', 0.15557437429733678), ('3', 0.2311833275570067)]


In [None]:
# Write out the final ranks
PageRankValues.coalesce(1).saveAsTextFile("file:///C://Users//Sunny//Documents//PageRankValues_Final")