In [6]:
import re
import sys
import time
from typing import Iterable, Tuple
from operator import add
from pyspark.sql import SparkSession
from pyspark.resultiterable import ResultIterable

def parseFunc(urls: str) -> Tuple[str, str]:
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

def contribution(urls: ResultIterable[str], rank: float) -> Iterable[Tuple[str, float]]:
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

In [7]:
start = time.process_time()

spark = SparkSession.builder.appName("PythonPageRank").getOrCreate()

lines = spark.read.text("Wiki-Vote.txt").rdd.map(lambda r: r[0])
links = lines.map(lambda urls: parseFunc(urls)).distinct().groupByKey().cache()
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

for iteration in range(10):
    contributions = links.join(ranks).flatMap(lambda url_urls_rank: contribution(url_urls_rank[1][0], url_urls_rank[1][1]))
    contributions = links.fullOuterJoin(contributions).mapValues(lambda x : x[1] or 0.0)
    ranks = contributions.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)

print("Time elapsed: ", time.process_time() - start, "sec")

ranklist = []
for x in ranks.collect():
    ranklist.append((int(x[0]), x[1]))
newlist = (sorted(ranklist, key = lambda x: x[1], reverse = True))

for l in newlist:
    print(l)

with open("PySpark-core-result.txt", "w") as output:
    for l in newlist:
        output.write(str(l)+'\n')

spark.stop()

Time elapsed:  0.14786375800000018 sec
(4037, 13.825470997583082)
(15, 11.07889897048744)
(6634, 11.041559748794656)
(2625, 9.921976935040517)
(2398, 7.903473688978912)
(2470, 7.528140677465928)
(2237, 7.472881300652041)
(4191, 6.846470267335859)
(7553, 6.546047878364906)
(5254, 6.4778056871146585)
(2328, 6.150855397239033)
(1186, 6.080198484088468)
(1297, 5.874924681067694)
(4335, 5.870047913410364)
(5412, 5.833409964045088)
(7620, 5.830903045868311)
(7632, 5.786301983202687)
(4875, 5.654859753847591)
(6946, 5.58306701573442)
(3352, 5.385830477450187)
(6832, 5.353087835478626)
(2654, 5.30393915572392)
(737, 5.269191834826169)
(762, 5.258003080756099)
(2066, 5.205357390374526)
(3089, 5.155933128104315)
(8293, 5.101995986674424)
(28, 5.051193839610018)
(2535, 5.025541893940849)
(3334, 5.005407071993259)
(214, 4.946765250577678)
(665, 4.900213503741743)
(4735, 4.896419376688686)
(7092, 4.845724793313772)
(6774, 4.835585331155096)
(2565, 4.6864113510872025)
(5484, 4.6617601301512765)
(804