# Direction Lab - Spark 02

In [4]:
from pyspark.sql import SparkSession
from operator import add

def compute_contributions(url_ranks):
    """Calculate contributions to linked pages."""
    url, (link_list, rank) = url_ranks
    num_links = len(link_list)
    if num_links == 0:
        return []
    
    for link in link_list:
        yield (link, rank / num_links)

def pagerank(web_graph, damping_factor=0.85, max_iteration=20, tolerance=0.001):
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("PageRank Example") \
        .getOrCreate()

    # Get the Spark context from the session
    sc = spark.sparkContext

        # Parallelize the web graph data
    links = sc.parallelize(web_graph)

    # Create an RDD of (page, links) pairs
    pages = links.map(lambda x: x[0]).distinct()
    N = pages.count() # total number of pages

    ranks = pages.map(lambda page: (page, 1.0))

    # Calculate out-degree for each page
    out_degree = links.map(lambda x: (x[0], len(x[1])))

    # Main PageRank iteration
    for iteration in range(max_iteration):
        # Join ranks with links to compute contributions
        contributions = links.join(ranks).flatMap(compute_contributions)
        
        # Calculate new ranks
        new_ranks = contributions.reduceByKey(add).mapValues(lambda rank: (1 - damping_factor) / N + damping_factor * rank)
        
        # Check for convergence
        rank_diffs = new_ranks.join(ranks).map(lambda x: abs(x[1][0] - x[1][1])).reduce(add)
        print(f"Iteration {iteration + 1}: Total rank difference = {rank_diffs}")

        ranks = new_ranks

        if  rank_diffs < tolerance:
            print(f"Converged after {iteration} iterations.")
            break
    
    return spark, ranks

In [5]:
# Sample web graph data: (page, [list of pages it points to])
web_graph = [
    ("A", ["B", "C"]),
    ("B", ["C"]),
    ("C", ["A"]),
    ("D", ["C"]),
]

In [6]:
# Parameters
damping_factor = 0.85 # typical value for PageRank
max_iterations = 20
tolerance = 0.001

In [7]:
spark, ranks = pagerank(web_graph, damping_factor=damping_factor, max_iteration=max_iterations, tolerance=tolerance)

# Collect and print the final ranks
final_ranks = ranks.collect()
print("\nFinal Page Ranks Score:")
for page, rank in final_ranks:
    print(f"Page: {page}, Rank: {rank:.4f}")

# Stop the Spark session
spark.stop()

25/04/10 08:17:28 WARN Utils: Your hostname, vishnu resolves to a loopback address: 127.0.1.1; using 103.16.62.251 instead (on interface eno8303)
25/04/10 08:17:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/10 08:17:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Iteration 1: Total rank difference = 1.8125


                                                                                

Iteration 2: Total rank difference = 2.3906250000000004


                                                                                

Iteration 3: Total rank difference = 1.9507500000000004


                                                                                

Iteration 4: Total rank difference = 0.9442171875000003


                                                                                

Iteration 5: Total rank difference = 0.5285313281250004


                                                                                

Iteration 6: Total rank difference = 0.3535776708984376


                                                                                

Iteration 7: Total rank difference = 0.3005410202636719


                                                                                

Iteration 8: Total rank difference = 0.2058705988806152


                                                                                

Iteration 9: Total rank difference = 0.1328391309565431


                                                                                

Iteration 10: Total rank difference = 0.11291326131306156


                                                                                

Iteration 11: Total rank difference = 0.09597627211610238


                                                                                

Iteration 12: Total rank difference = 0.08157983129868684


                                                                                

Iteration 13: Total rank difference = 0.06934285660388387


                                                                                

Iteration 14: Total rank difference = 0.05894142811330133


                                                                                

Iteration 15: Total rank difference = 0.05010021389630606


                                                                                

Iteration 16: Total rank difference = 0.04258518181186016


                                                                                

Iteration 17: Total rank difference = 0.03619740454008108


                                                                                

Iteration 18: Total rank difference = 0.03076779385906897


                                                                                

Iteration 19: Total rank difference = 0.026152624780208605


                                                                                

Iteration 20: Total rank difference = 0.0222297310631773


                                                                                


Final Page Ranks Score:
Page: B, Rank: 0.1863
Page: C, Rank: 0.3485
Page: A, Rank: 0.3413


In [None]:
web_graph = [
    ("A", ["B", "C"]),
]
# Parameters
damping_factor = 0.85 # typical value for PageRank
max_iterations = 20
tolerance = 0.001

spark, ranks = pagerank(web_graph, damping_factor=damping_factor, max_iteration=max_iterations, tolerance=tolerance)
# Collect and print the final ranks
final_ranks = ranks.collect()
print("\nFinal Page Ranks Score:")
for page, rank in final_ranks:
    print(f"Page: {page}, Rank: {rank:.4f}")
# Stop the Spark session
spark.stop()