In [1]:
 from pyspark.sql import SparkSession
 from pyspark.sql.types import *
 import pyspark.sql.functions as F
 spark = SparkSession.builder.appName('Graphs-HW2').getOrCreate()

In [2]:
# Read lines from the text file
pr_sdf = spark.read.load('pr_graph.txt', format="text")

In [3]:
# convert "Value" column into seperate columns:
# from_node, to_node, edge_type
pr_sdf = pr_sdf.select( \
    F.split(pr_sdf.value, " ")[0].alias("from_node").cast("int"), \
    F.split(pr_sdf.value, " ")[1].alias("to_node").cast("int"), \
    F.split(pr_sdf.value, " ")[2].alias("edge_type"))

In [4]:
# function to return portion of weight given weight column
def propOweight(wval):
    return 1 / wval

sparkPropOweightF = F.udf(propOweight,  DoubleType())

In [5]:
def summation4pagerange(propOfWeight,previous_pagerank):
    beta = 0.15
    alpha = 0.85
    sigma = alpha * ((propOfWeight * float(previous_pagerank)) + beta)
    return(sigma)

sparkSum4pagerank = F.udf(summation4pagerange, DoubleType())

In [6]:
def pagerank(G, num_iter):
    
    #G.repartition(2, "from_node")
    # get proportion of weight dataframe
    propOfWeight_sdf = G.select(\
        G.from_node).groupBy(\
        "from_node").agg(F.count("from_node").alias(\
        "weight"))
    propOfWeight_sdf = propOfWeight_sdf.select( \
        propOfWeight_sdf.from_node.alias("from_node2"), \
        sparkPropOweightF(propOfWeight_sdf.weight).alias('propOfWeight'))
    
    # transfer weights
    map_sdf = propOfWeight_sdf.join(G, \
            propOfWeight_sdf.from_node2 == G.from_node)
    map_sdf = map_sdf.select(map_sdf.from_node, \
        map_sdf.to_node, map_sdf.propOfWeight)
    map_sdf.cache()
    
    # go through base case
    pagerank0=1/G.count()
    pr_values_sdf = G.select(G.to_node.alias("node_id"),
        G.edge_type.alias("pagerank"))
    pr_values_sdf = pr_values_sdf.withColumn('pagerank', F.lit(pagerank0))
    # iterate equation until finished
    if num_iter > 0:
        for i in range(1,num_iter+1):
            pr_values_sdf = pr_values_sdf.join(map_sdf, \
                pr_values_sdf.node_id == map_sdf.to_node)
            pr_values_sdf = pr_values_sdf.select( \
                pr_values_sdf.node_id, \
                sparkSum4pagerank(pr_values_sdf.propOfWeight, \
                pr_values_sdf.pagerank).alias('sum_pagerank'))
            pr_values_sdf = pr_values_sdf.groupBy(\
                "node_id").agg(F.sum("sum_pagerank").alias(\
                "pagerank"))
    return pr_values_sdf
    #return propOfWeight_sdf

## Step 5

In [7]:
pr_sdf.count()

16

In [8]:
pr_sdf.show()

+---------+-------+---------+
|from_node|to_node|edge_type|
+---------+-------+---------+
|        1|      2|        0|
|        1|      3|        0|
|        1|      4|        0|
|        1|      5|        0|
|        2|      3|        0|
|        2|      5|        0|
|        3|      2|        0|
|        4|      5|        0|
|        5|      1|        0|
|        5|      6|        0|
|        5|      7|        0|
|        6|      7|        0|
|        7|      6|        0|
|        7|      2|        0|
|        7|      7|        0|
|        5|      4|        0|
+---------+-------+---------+



In [9]:
pagerank(pr_sdf, 5).orderBy("node_id").show()

+-------+-------------------+
|node_id|           pagerank|
+-------+-------------------+
|      1|0.16186168922805783|
|      2|  7.114950118931851|
|      3| 0.6846588590011595|
|      4|0.44738178247070304|
|      5|  10.04020975711441|
|      6| 0.5097860616265127|
|      7|  7.114950118931851|
+-------+-------------------+

