In [1]:
import findspark
findspark.init(r"C:\spark-2.4.5-bin-hadoop2.7")

import pyspark
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql import SparkSession

# Create Spark Context & Session

In [2]:
# sc = pyspark.SparkContext(appName="experiment")
spark = SparkSession.builder.getOrCreate()
spark

In [3]:
sc = spark.sparkContext
sc

# Page Rank Example

In [4]:
pageLinks = [
    ['a' ,['b','c','d']],
    ['c', ['b']],
    ['b', ['d','c']],
    ['d', ['a','c']]
]
pageRanks = [
    ['a',1],
    ['c',1],
    ['b',1],
    ['d',1]
]

numIter = 3
s = 0.85 # damping factor

def rankContribution(uris, rank):
    numberOfUris = len(uris)
    rank_contribution = float(rank) / numberOfUris
    return [(uri, rank_contribution) for uri in uris]

In [5]:
pageLinksRDD = sc.parallelize(pageLinks, 2)
pageRanksRDD = sc.parallelize(pageRanks, 2)

for i in range(numIter):
    # Create joined RDD
    linksRank = pageLinksRDD.join(pageRanksRDD)
    
    # Calculate total contribution for all pages
    contributedRDD = linksRank.flatMap(lambda x : rankContribution(x[1][0],x[1][1]))
    sumRanks = contributedRDD.reduceByKey(lambda v1,v2 : v1+v2) 
    # Or       contributedRDD.groupByKey().mapValues(sum)
    
    # Update page ranks
    pageRanksRDD = sumRanks.map(lambda x : (x[0],(1-s)+s*x[1]))
    
pageRanksRDD.collect()

[('a', 0.5147916666666666),
 ('b', 1.335513888888889),
 ('c', 1.2572430555555556),
 ('d', 0.8924513888888889)]

In [6]:
df_page_links = pd.DataFrame(pageLinks, columns=['index', 'link']).set_index('index')
df_page_ranks = pd.DataFrame(pageRanks, columns=['index', 'rank']).set_index('index')

for i in range(numIter):
    # Calculate total contributions
    contributions = df_page_links.join(df_page_ranks) \
                         .apply(lambda x: rankContribution(x['link'], x['rank']), axis=1) \
                         .sum()
    
    df_sum_contributions = pd.DataFrame(contributions, columns=['index', 'rank']) \
                             .groupby('index').sum()
    
    df_page_ranks['rank'] = df_sum_contributions['rank'].apply(lambda x : (1-s)+s*x)
    
list(zip(df_page_ranks.index, df_page_ranks['rank']))

[('a', 0.5147916666666666),
 ('c', 1.2572430555555556),
 ('b', 1.335513888888889),
 ('d', 0.8924513888888889)]