# Page rank

### Handle dependencies

In [1]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


### Handle imports & context

In [2]:
import json
from pyspark.sql import SparkSession
from math import sqrt
from time import time

In [3]:
MAX_PARTITIONS = 12

spark = SparkSession.builder.config(
    'spark.default.parallelism',
    MAX_PARTITIONS
).getOrCreate()
context = spark.sparkContext

23/06/30 01:46:43 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.1.164 instead (on interface enp0s31f6)
23/06/30 01:46:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/30 01:46:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Load graph

In [4]:
GRAPH_PATH = 'graph.json'


with open(GRAPH_PATH, 'r') as file:
    data = json.load(file)
    nodes = context.parallelize(data['nodes'])
    edges = context.parallelize(data['edges'])

### Define functions to be used

In [5]:
def merge_rdds(rdd1, rdd2, opt):
    joined_rdd = rdd1.union(rdd2)
    merged_rdds = joined_rdd.reduceByKey(lambda x, y: opt(x, y))
    return merged_rdds


def minus(x, y):
    return x - y


def add(x, y):
    return x + y


def initialize_page_rank(nodes):
    """
        Input:
            - nodes: RDD
        Output:
            - page_rank: RDD
        Create initial page_rank for nodes where every nodes has value
        1 / |nodes|.
    """
    n_nodes = nodes.count()
    return nodes.map(lambda x: (x, 1/n_nodes))


def divide_page_rank(page_rank, edges):
    partial_node_neighbors = edges.map(lambda elem: (elem[0], 1))
    divided_page_rank = page_rank.map(
        lambda elem: (elem[0], 0)
    ).union(
        partial_node_neighbors
    ).reduceByKey(
        lambda x, y: x + y
    ).map(
        lambda elem: elem if elem[1] != 0 else (elem[0], 1)
    ).join(
        page_rank
    ).map(
        lambda elem: (elem[0], elem[1][1] / elem[1][0])
    )
    return divided_page_rank


def generate_messages(node_messages, edges):
    """
        Input:
            - node_messages: RDD
            - edges: RDD
        Output:
            - messages: RDD
        node_messages is compossed of elements (node, message) and the resulting
        message is compossed of elements (target_node, message) where
        target_node is determined by the graph edges.
    """
    messages = edges.join(node_messages)
    return messages.map(lambda elem: (elem[1][0], elem[1][1]))


def send_messages(messages):
    """
        Input:
            - messages: RDD
        Output:
            - merged_messages: RDD
        Merge value of messages for every node present.
    """
    return messages.reduceByKey(lambda x, y: x + y)


def update_page_rank(page_rank, edges, damping_factor):
    divided_page_rank = divide_page_rank(page_rank, edges)
    # Generate and send new messages
    page_rank_out_values = generate_messages(divided_page_rank, edges)
    page_rank_in_values = send_messages(page_rank_out_values)
    # Update the page rank value of each node
    n_nodes = page_rank.count()
    blank_page_rank = page_rank.map(lambda elem: (elem[0], 0))
    new_page_rank = merge_rdds(blank_page_rank, page_rank_in_values, add)
    new_page_rank = new_page_rank.map(
        lambda elem: (
            elem[0],
            (elem[1] * damping_factor) + (1 - damping_factor) / n_nodes
        )
    )
    return new_page_rank


def page_rank_done(page_rank, new_page_rank, delta):
    deltas = merge_rdds(page_rank, new_page_rank, minus)
    done_nodes = deltas.map(lambda x: abs(x[1]) <= delta)
    return done_nodes.reduce(lambda x, y: x and y)


def run_page_rank(nodes, edges):
    page_rank = initialize_page_rank(nodes)
    damping_factor = 0.85
    delta = 1 / (nodes.count() * 10)
    iteration = 1
    pr_start = time()
    while True:
        start = time()
        new_page_rank = update_page_rank(page_rank, edges, damping_factor)
        if page_rank_done(page_rank, new_page_rank, delta):
            break
        page_rank = new_page_rank
        print(f'Iteration {iteration} finished in {time() - start:.3f} sec.')
        iteration += 1
    print(f'Page Rank finished in {time() - pr_start:.3f}')
    return new_page_rank.collect()

In [6]:
page_rank = run_page_rank(nodes, edges)

                                                                                

Iteration 1 finished in 4.249 sec.


                                                                                

Iteration 2 finished in 3.335 sec.




Iteration 3 finished in 3.365 sec.
Iteration 4 finished in 3.248 sec.


                                                                                

Iteration 5 finished in 3.281 sec.


                                                                                

Iteration 6 finished in 3.189 sec.


                                                                                

Iteration 7 finished in 3.167 sec.


                                                                                

Iteration 8 finished in 3.208 sec.
Iteration 9 finished in 3.127 sec.




Page Rank finished in 33.435


                                                                                

In [7]:
print(f"{'node':<10} | {'rank':<10}")
print(f"{'-' * 11}+{'-' * 11}")
for node, rank in page_rank:
    print(f'{node:<10} | {round(rank, 8):<10}')

node       | rank      
-----------+-----------
12         | 0.00039936
24         | 0.00055975
36         | 0.00038588
48         | 0.00080861
60         | 0.00015   
72         | 0.00018188
84         | 0.00015   
96         | 0.00021375
108        | 0.00015   
120        | 0.00015   
132        | 0.000303  
144        | 0.00015   
156        | 0.00047349
168        | 0.00015   
180        | 0.00030212
192        | 0.00015   
204        | 0.00015   
216        | 0.00015   
228        | 0.00015   
240        | 0.00015   
252        | 0.0001925 
264        | 0.00015   
276        | 0.00046413
288        | 0.00015   
300        | 0.00015   
312        | 0.00015   
324        | 0.0002775 
336        | 0.00015   
348        | 0.00015   
360        | 0.00015   
372        | 0.00015   
384        | 0.00055279
396        | 0.0002775 
408        | 0.0001925 
420        | 0.00075254
432        | 0.00034917
444        | 0.00037505
456        | 0.00015   
468        | 0.00015   
480        | 0.0