In [1]:
import csv
import json
import numpy as np
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.rdd import RDD

In [2]:
spark = SparkSession.builder.appName("Sparkpagerank").getOrCreate()

page_df = spark.read.csv("page.csv", header=True, inferSchema=True)
link_df = spark.read.json("link_annotated_text.jsonl")
item_df = spark.read.csv("item.csv", header=True, inferSchema=True)
property_df = spark.read.csv("property.csv", header=True, inferSchema=True)
statements_df = spark.read.csv("statements.csv", header=True)

23/11/15 05:35:39 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [None]:
page_f

In [3]:
from pyspark.sql.functions import col, explode, collect_list

# Explode the 'sections' array
exploded_df = link_df.select("page_id", explode("sections").alias("section"))

# Select the 'page_id' and 'target_page_ids' from the exploded DataFrame
all_page_target_ids = exploded_df.select("page_id", "section.target_page_ids")

# Explode the 'target_page_ids' array to separate rows
exploded_target_ids = all_page_target_ids.select("page_id", explode("target_page_ids").alias("target_item_ids"))

# Group by 'page_id' and collect the 'target_item_ids' into lists for each page
page_target_lists = exploded_target_ids.groupBy("page_id").agg(collect_list("target_item_ids").alias("target_item_ids"))

# Show the result
#page_target_lists.show()

# To create a new DataFrame with 'page_id' and 'target_item_ids' columns
new_df = page_target_lists.select("page_id", "target_item_ids")

# Show the new DataFrame
new_df.show()


edges = new_df.rdd.map(lambda row: (row['page_id'], row['target_item_ids']))
vertices_rdd = page_target_lists.select("page_id").rdd.map(lambda row: row[0])

# Assuming your DataFrame is named 'new_df'
rdd = new_df.rdd

                                                                                

+-------+--------------------+
|page_id|     target_item_ids|
+-------+--------------------+
|    330|[5282, 5282, 2005...|
|    656|[19555, 18963787,...|
|    677|[4100885, 63778, ...|
|    705|[206578, 25536, 3...|
|    736|[30001, 25202, 42...|
|    857|[1514856, 357427,...|
|   1010|[35428, 35877901,...|
|   1055|[52303, 28297, 34...|
|   1152|[175724, 7406, 53...|
|   1175|[35251, 1095706, ...|
|   1202|[5213, 454746, 19...|
|   1217|[207114, 18956035...|
|   1241|[2393552, 29833, ...|
|   1336|[2965331, 5971097...|
|   1338|[37071, 8900, 562...|
|   1360|[75462, 75462, 46...|
|   1371|[2611280, 1605489...|
|   1374|[4940, 692866, 43...|
|   1409|[2462183, 1408, 5...|
|   1428|[17730, 247991, 1...|
+-------+--------------------+
only showing top 20 rows





In [4]:

# Define a function to map each row to the desired format
def map_to_desired_format(row):
    page_id = row.page_id
    target_item_ids = row.target_item_ids
    degree = len(target_item_ids)
    return (page_id, degree, target_item_ids)

# Apply the mapping function to the RDD
result_rdd = rdd.map(map_to_desired_format)

In [None]:
N = 5362174

#initialize rold with 1/N
r_old = vertices_rdd.map(lambda x: (x, 1/N))

#initialize rnew with (1-B)/N where B = 0.85 and N = number of vertices
#r_new_rdd = vertices_rdd.map(lambda x: (x, (1-B)/N))
r_new_rdd = vertices_rdd.zipWithIndex().map(lambda x: (x[0], (x[1] + 1) / N))

def divide_r_new_into_blocks(r_new_rdd: RDD, block_size: int):
    print("divide_r_new_into_blocks", r_new_rdd, block_size)
    r_new_blocks = r_new_rdd.randomSplit([1.0 / block_size] * block_size)
    return r_new_blocks

r_new_blocks = divide_r_new_into_blocks(r_new_rdd, 100)

In [4]:
# Save each block of r_new_blocks to a separate text file
for block_number, r_new_block in enumerate(r_new_blocks, start=0):
    r_new_block.saveAsTextFile(f'r_new_block_{block_number}')


                                                                                

In [5]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark import RDD

# Initialize Spark
spark = SparkSession.builder.appName("PageRank").getOrCreate()
sc = spark.sparkContext

def divide_r_new_into_blocks(r_new_rdd: RDD, block_size: int):
    print("divide_r_new_into_blocks", r_new_rdd, block_size)
    r_new_blocks = r_new_rdd.randomSplit([1.0 / block_size] * block_size)
    return r_new_blocks


def divide_M_into_blocks(M_rdd: RDD, destination_nodes_set: set):
    M_block = M_rdd.filter(lambda x: any(node in destination_nodes_set for node in x[2]))
    M_block = M_block.map(lambda x: (x[0], x[1], [node for node in x[2] if node in destination_nodes_set]))
    return M_block

def compute_contributions(node, degree, dest_nodes, pageranks_dict_broadcast):
    if degree == 0:
        return [(node, 0.0)]
    num_neighbors = degree
    contributions = []
    for dest in dest_nodes:
        try:
            contribution_value = pageranks_dict_broadcast.value[node] / num_neighbors
            contributions.append((dest, contribution_value))
        except KeyError:
            continue
    return contributions

def compute_pagerank_for_block(r_new_block: RDD, M_block: RDD, d: float = 0.85, max_iterations: int = 100, convergence_threshold: float = 1e-6):
    r_new_block.persist()
    M_block.persist()

    destination_nodes = M_block.flatMap(lambda x: x[2])
    pageranks = r_new_block.map(lambda x: (x[0], x[1]))

    prev_pageranks = None

    for i in range(max_iterations):
        pageranks_dict = pageranks.collectAsMap()
        pageranks_dict_broadcast = sc.broadcast(pageranks_dict)

        contributions = r_new_block.union(M_block.flatMap(lambda x: compute_contributions(x[0], x[1], x[2], pageranks_dict_broadcast)))

        pageranks = contributions.reduceByKey(lambda x, y: x + y).mapValues(lambda x: (1 - d) + d * x)

        if prev_pageranks is not None:
            max_change = pageranks.join(prev_pageranks).map(lambda x: abs(x[1][0] - x[1][1])).max()
            if max_change < convergence_threshold:
                print(f"Converged after {i + 1} iterations.")
                break

        prev_pageranks = pageranks

    r_new_block.unpersist()
    M_block.unpersist()

    return pageranks



23/11/12 04:19:05 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [7]:
# Set the desired range of blocks to run (0 to 9)
start_block = 0
end_block = 9

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks0.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 44 iterations.
Block 0 completed


                                                                                

Converged after 81 iterations.
Block 1 completed


                                                                                

Converged after 32 iterations.
Block 2 completed


                                                                                

Converged after 61 iterations.
Block 3 completed


                                                                                

Converged after 17 iterations.
Block 4 completed


                                                                                

Converged after 38 iterations.
Block 5 completed


                                                                                

Converged after 80 iterations.
Block 6 completed


                                                                                

Converged after 43 iterations.
Block 7 completed


                                                                                

Converged after 62 iterations.
Block 8 completed


                                                                                

Converged after 75 iterations.
Block 9 completed


In [8]:
# Set the desired range of blocks to run (0 to 9)
start_block = 10
end_block = 19

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks1.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 60 iterations.
Block 10 completed


                                                                                

Converged after 58 iterations.
Block 11 completed


                                                                                

Converged after 70 iterations.
Block 12 completed


                                                                                

Converged after 35 iterations.
Block 13 completed


                                                                                

Converged after 49 iterations.
Block 14 completed


                                                                                

Converged after 23 iterations.
Block 15 completed


                                                                                

Converged after 85 iterations.
Block 16 completed


                                                                                

Converged after 40 iterations.
Block 17 completed


                                                                                

Converged after 82 iterations.


                                                                                

Block 18 completed


                                                                                

Converged after 51 iterations.
Block 19 completed


In [7]:
# Set the desired range of blocks to run (0 to 9)
start_block = 20
end_block = 29

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks2.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 37 iterations.
Block 20 completed


                                                                                

Converged after 82 iterations.
Block 21 completed


                                                                                

Converged after 30 iterations.
Block 22 completed


                                                                                

Converged after 34 iterations.
Block 23 completed


                                                                                

Converged after 56 iterations.
Block 24 completed


                                                                                

Converged after 59 iterations.
Block 25 completed


                                                                                

Converged after 39 iterations.
Block 26 completed


                                                                                

Converged after 78 iterations.
Block 27 completed


                                                                                

Converged after 83 iterations.
Block 28 completed


                                                                                

Converged after 16 iterations.
Block 29 completed


In [8]:
# Set the desired range of blocks to run (0 to 9)
start_block = 30
end_block = 39

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks3.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 40 iterations.
Block 30 completed


                                                                                

Converged after 51 iterations.
Block 31 completed


                                                                                

Converged after 68 iterations.
Block 32 completed


                                                                                

Converged after 31 iterations.
Block 33 completed


                                                                                

Converged after 29 iterations.
Block 34 completed


                                                                                

Converged after 38 iterations.
Block 35 completed


                                                                                

Converged after 16 iterations.
Block 36 completed


                                                                                

Converged after 67 iterations.
Block 37 completed


                                                                                

Converged after 82 iterations.
Block 38 completed


                                                                                

Converged after 28 iterations.
Block 39 completed


In [7]:
# Set the desired range of blocks to run (0 to 9)
start_block = 40
end_block = 49

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks4.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 55 iterations.


                                                                                

Block 40 completed


                                                                                

Converged after 30 iterations.
Block 41 completed


                                                                                

Converged after 39 iterations.
Block 42 completed


                                                                                

Converged after 42 iterations.
Block 43 completed


                                                                                

Converged after 35 iterations.
Block 44 completed


                                                                                

Converged after 57 iterations.
Block 45 completed


                                                                                

Converged after 69 iterations.
Block 46 completed


                                                                                

Converged after 83 iterations.
Block 47 completed


                                                                                

Converged after 37 iterations.
Block 48 completed


                                                                                

Converged after 85 iterations.
Block 49 completed


In [8]:
# Set the desired range of blocks to run (0 to 9)
start_block = 50
end_block = 59

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks5.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 23 iterations.
Block 50 completed


                                                                                

Converged after 28 iterations.
Block 51 completed


                                                                                

Converged after 40 iterations.
Block 52 completed


                                                                                

Converged after 29 iterations.
Block 53 completed


                                                                                

Converged after 19 iterations.
Block 54 completed


                                                                                

Converged after 30 iterations.
Block 55 completed


                                                                                

Converged after 54 iterations.
Block 56 completed


                                                                                

Converged after 39 iterations.
Block 57 completed


                                                                                

Converged after 43 iterations.
Block 58 completed


                                                                                

Converged after 80 iterations.
Block 59 completed


In [9]:
# Set the desired range of blocks to run (0 to 9)
start_block = 60
end_block = 69

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks6.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 85 iterations.
Block 60 completed


                                                                                

Converged after 51 iterations.
Block 61 completed


                                                                                

Converged after 63 iterations.
Block 62 completed


                                                                                

Converged after 69 iterations.


                                                                                

Block 63 completed


                                                                                

Converged after 47 iterations.


                                                                                

Block 64 completed


                                                                                

Converged after 84 iterations.


                                                                                

Block 65 completed


                                                                                

Converged after 47 iterations.


                                                                                

Block 66 completed


                                                                                

Converged after 36 iterations.


                                                                                

Block 67 completed


                                                                                

Converged after 73 iterations.
Block 68 completed


                                                                                

Converged after 81 iterations.
Block 69 completed


In [6]:
# Set the desired range of blocks to run (0 to 9)
start_block = 70
end_block = 79

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks7.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 43 iterations.
Block 70 completed


                                                                                

Converged after 81 iterations.
Block 71 completed


                                                                                

Converged after 34 iterations.
Block 72 completed


                                                                                

Converged after 60 iterations.
Block 73 completed


                                                                                

Converged after 36 iterations.
Block 74 completed


                                                                                

Converged after 44 iterations.
Block 75 completed


                                                                                

Converged after 17 iterations.
Block 76 completed


                                                                                

Converged after 51 iterations.
Block 77 completed


                                                                                

Converged after 24 iterations.
Block 78 completed


                                                                                

Converged after 64 iterations.
Block 79 completed


In [7]:
# Set the desired range of blocks to run (0 to 9)
start_block = 80
end_block = 89

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks8.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 22 iterations.
Block 80 completed


                                                                                

Converged after 75 iterations.
Block 81 completed


                                                                                

Converged after 80 iterations.
Block 82 completed


                                                                                

Converged after 67 iterations.
Block 83 completed


                                                                                

Converged after 29 iterations.
Block 84 completed


                                                                                

Converged after 43 iterations.
Block 85 completed


                                                                                

Converged after 16 iterations.
Block 86 completed


                                                                                

Converged after 54 iterations.
Block 87 completed


                                                                                

Converged after 39 iterations.
Block 88 completed


                                                                                

Converged after 77 iterations.
Block 89 completed


In [8]:
# Set the desired range of blocks to run (0 to 9)
start_block = 90
end_block = 99

# Iterate over the specified range of blocks
for block_number in range(start_block, end_block + 1):
    # Read the r_new_block from text file
    r_new_block_path = f'r_new_block_{block_number}'
    r_new_block = sc.textFile(r_new_block_path).map(eval)

    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks9.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)

        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])

        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")


                                                                                

Converged after 27 iterations.
Block 90 completed


                                                                                

Converged after 44 iterations.
Block 91 completed


                                                                                

Converged after 25 iterations.
Block 92 completed


                                                                                

Converged after 37 iterations.
Block 93 completed


                                                                                

Converged after 47 iterations.
Block 94 completed


                                                                                

Converged after 30 iterations.
Block 95 completed


                                                                                

Converged after 45 iterations.
Block 96 completed


                                                                                

Converged after 59 iterations.
Block 97 completed


                                                                                

Converged after 37 iterations.
Block 98 completed


                                                                                

Converged after 50 iterations.
Block 99 completed


In [3]:
import csv

# Initialize a CSV file for storing PageRank results
csv_file = open('final_pageranks0.csv', 'w', newline='')
csv_writer = csv.writer(csv_file)

# Set the desired number of blocks to run (0 to 9, a total of 10 blocks)
num_blocks_to_run = 5

# Iterate over each r_new block
for block_number, r_new_block in enumerate(r_new_blocks, start=0):
    # Create a set of destination nodes from r_new_block
    destination_nodes_set = set(r_new_block.map(lambda x: x[0]).collect())

    # Divide M into the corresponding block
    M_block = divide_M_into_blocks(result_rdd, destination_nodes_set)

    # Perform PageRank computation for the current block
    pagerank_results = compute_pagerank_for_block(r_new_block, M_block)

    # Open the CSV file in append mode and write the PageRank results for the current block
    with open('final_pageranks0.csv', 'a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)
        
        # Write the header row if the file is empty
        if csv_file.tell() == 0:
            csv_writer.writerow(['Node', 'PageRank'])
        
        for node, pagerank in pagerank_results.collect():
            csv_writer.writerow([node, pagerank])

    # Print the block number completed
    print(f"Block {block_number} completed")

    # Check if the desired number of blocks have been processed
    if block_number + 1 == num_blocks_to_run:
        break

# Close the CSV file after all iterations
csv_file.close()


                                                                                

Converged after 31 iterations.
Block 0 completed


                                                                                

Converged after 77 iterations.
Block 1 completed


                                                                                

Converged after 37 iterations.
Block 2 completed


                                                                                

Converged after 43 iterations.
Block 3 completed


                                                                                

Converged after 49 iterations.
Block 4 completed
