In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('Graphs-HW2').getOrCreate()

In [15]:
# Read lines from the text file
pr_sdf = spark.read.load('pr_graph.txt', format="text")

#pr_sdf.createOrReplaceTempView('pr_sdf_view')
#pr_sdf = spark.sql("SELECT CAST(split(value, ' ')[0] AS int) AS from_node, CAST(split(value, ' ')[1] AS int) AS to_node, 'not_important' AS edge_type FROM pr_sdf_view")


In [10]:
helper1 = pr_sdf.groupBy('from_node').count().sort('from_node', ascending=True)

divide_by_n = F.udf(lambda x: 1/x, DoubleType())
helper1 = helper1.select('from_node', divide_by_n('count'))
helper1 = helper1.withColumnRenamed('<lambda>(count)', 'weight')

helper1.show()

+---------+------------------+
|from_node|            weight|
+---------+------------------+
|        1|              0.25|
|        2|               0.5|
|        3|               1.0|
|        4|               1.0|
|        5|              0.25|
|        6|               1.0|
|        7|0.3333333333333333|
+---------+------------------+



In [11]:
helper2 = pr_sdf.join(helper1, pr_sdf.from_node == helper1.from_node).select(pr_sdf['from_node'], pr_sdf['to_node'], helper1['weight'])
helper2.show()

+---------+-------+------------------+
|from_node|to_node|            weight|
+---------+-------+------------------+
|        1|      2|              0.25|
|        1|      3|              0.25|
|        1|      4|              0.25|
|        1|      5|              0.25|
|        2|      3|               0.5|
|        2|      5|               0.5|
|        3|      2|               1.0|
|        4|      5|               1.0|
|        5|      1|              0.25|
|        5|      6|              0.25|
|        5|      7|              0.25|
|        6|      7|               1.0|
|        7|      6|0.3333333333333333|
|        7|      2|0.3333333333333333|
|        7|      7|0.3333333333333333|
|        5|      4|              0.25|
+---------+-------+------------------+



In [12]:
helper3 = pr_sdf.groupby('from_node').count().sort('from_node', ascending=True)
num = helper3.count()

helper3 = helper3.withColumn('count', F.lit(1/num))
helper3 = helper3.withColumnRenamed('count', 'pagerank')
helper3.show()

+---------+-------------------+
|from_node|           pagerank|
+---------+-------------------+
|        1|0.14285714285714285|
|        2|0.14285714285714285|
|        3|0.14285714285714285|
|        4|0.14285714285714285|
|        5|0.14285714285714285|
|        6|0.14285714285714285|
|        7|0.14285714285714285|
+---------+-------------------+



In [13]:
final = helper2.join(helper3, helper2.from_node == helper3.from_node).select(helper2['from_node'], helper2['to_node'], (helper2.weight * helper3.pagerank).alias('product'))
final = final.groupby('to_node').agg({'product': 'sum'})
final = final.withColumnRenamed('sum(product)', 'pagerank')
final = final.withColumnRenamed('to_node', 'from_node').sort('from_node', ascending=True)

alpha = 0.85
beta = 0.15

alpha_beta = F.udf(lambda x: alpha*x+beta, DoubleType())
final = final.select('from_node', alpha_beta('pagerank'))
final = final.withColumnRenamed('<lambda>(pagerank)', 'pagerank')

final.show()



+---------+-------------------+
|from_node|           pagerank|
+---------+-------------------+
|        1|0.18035714285714285|
|        2|0.34226190476190477|
|        3|0.24107142857142855|
|        4| 0.2107142857142857|
|        5|             0.3625|
|        6|0.22083333333333333|
|        7|0.34226190476190477|
+---------+-------------------+



In [24]:


def pagerank(G, num_iter):
    
    alpha = 0.85
    beta = 0.15
    G.createOrReplaceTempView('G_view')
    G = spark.sql("SELECT CAST(split(value, ' ')[0] AS int) AS from_node, CAST(split(value, ' ')[1] AS int) AS to_node, 'not_important' AS edge_type FROM G_view")


    helper1 = G.groupBy('from_node').count().sort('from_node', ascending=True)

    divide_by_n = F.udf(lambda x: 1/x, DoubleType())
    helper1 = helper1.select('from_node', divide_by_n('count'))
    helper1 = helper1.withColumnRenamed('<lambda>(count)', 'weight')
    
    helper2 = G.join(helper1, G.from_node == helper1.from_node).select(G['from_node'], G['to_node'], helper1['weight'])

    helper3 = G.groupby('from_node').count().sort('from_node', ascending=True)
    num = helper3.count()
    helper3 = helper3.withColumn('count', F.lit(1/num))
    helper3 = helper3.withColumnRenamed('count', 'pagerank')
    
    for i in range(num_iter):
        final = helper2.join(helper3, helper2.from_node == helper3.from_node).select(helper2['from_node'], helper2['to_node'], (helper2.weight * helper3.pagerank).alias('product'))
        final = final.groupby('to_node').agg({'product': 'sum'}).alias('pagerank')
        final = final.withColumnRenamed('sum(product)', 'pagerank')
        final = final.withColumnRenamed('to_node', 'from_node').sort('from_node', ascending=True)
        
        alpha_beta = F.udf(lambda x: alpha*x+beta, DoubleType())
        final = final.select('from_node', alpha_beta('pagerank'))
        final = final.withColumnRenamed('<lambda>(pagerank)', 'pagerank')
        
        helper3 = final
    
    
    pr_values_sdf = helper3
    pr_values_sdf = pr_values_sdf.withColumnRenamed('from_node','node')
    
    
    return pr_values_sdf

## Step 5

In [4]:
pr_sdf.count()

16

In [5]:
pr_sdf.show()

+-----+
|value|
+-----+
|1 2 0|
|1 3 0|
|1 4 0|
|1 5 0|
|2 3 0|
|2 5 0|
|3 2 0|
|4 5 0|
|5 1 0|
|5 6 0|
|5 7 0|
|6 7 0|
|7 6 0|
|7 2 0|
|7 7 0|
|5 4 0|
+-----+



In [25]:
pagerank(pr_sdf, 5).orderBy("node").show()

+----+-------------------+
|node|           pagerank|
+----+-------------------+
|   1| 0.3056855115440409|
|   2|  0.843575161873236|
|   3| 0.5262470701160463|
|   4|0.36610546184882914|
|   5| 0.8147069488207835|
|   6| 0.5380753784802238|
|   7| 0.9433725923168398|
+----+-------------------+

