-
Notifications
You must be signed in to change notification settings - Fork 0
/
pagerank.py
62 lines (46 loc) · 2.06 KB
/
pagerank.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import re
import sys
from operator import add
from pyspark.sql import SparkSession
def computeContribs(urls, rank):
"""Calculates URL contributions to the rank of other URLs."""
num_urls = len(urls)
for url in urls:
yield (url, rank / num_urls)
def parseNeighbors(urls):
"""Parses a urls pair string into urls pair."""
parts = re.split(r'\s+', urls)
return parts[0], parts[1]
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: pagerank <file> <iterations>", file=sys.stderr)
exit(-1)
print("WARN: This is a naive implementation of PageRank and is given as an example!\n" +
"Please refer to PageRank implementation provided by graphx",
file=sys.stderr)
# Initialize the spark context.
spark = SparkSession \
.builder \
.appName("PythonPageRank") \
.getOrCreate()
# Loads in input file. It should be in format of:
# URL neighbor URL
# URL neighbor URL
# URL neighbor URL
# ...
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
# Loads all URLs from input file and initialize their neighbors.
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(int(sys.argv[2])):
# Calculates URL contributions to the rank of other URLs.
contribs = links.join(ranks).flatMap(
lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
# Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
# Collects all URL ranks and dump them to console.
for (link, rank) in ranks.collect():
print("%s has rank: %s." % (link, rank))
spark.stop()