In [1]:
# ============================================
# PART 1: SETUP AND IMPORTS
# ============================================

import os
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, deque
from itertools import combinations

# Spark imports ONLY - no graph libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("WikiVoteNetworkNoGraphLibs") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("="*60)
print("ASSIGNMENT 2: GRAPH ANALYSIS WITHOUT GRAPH LIBRARIES")
print("="*60)
print(f"Spark Version: {spark.version}")
print("Only using core Spark functionality - no graph libraries!")
print("="*60)

!wget -nc https://snap.stanford.edu/data/wiki-Vote.txt.gz
!gunzip -k wiki-Vote.txt.gz



ASSIGNMENT 2: GRAPH ANALYSIS WITHOUT GRAPH LIBRARIES
Spark Version: 3.5.1
Only using core Spark functionality - no graph libraries!
--2025-10-05 14:16:11--  https://snap.stanford.edu/data/wiki-Vote.txt.gz
Resolving snap.stanford.edu (snap.stanford.edu)... 171.64.75.80
Connecting to snap.stanford.edu (snap.stanford.edu)|171.64.75.80|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 290339 (284K) [application/x-gzip]
Saving to: ‘wiki-Vote.txt.gz’


2025-10-05 14:16:11 (1.89 MB/s) - ‘wiki-Vote.txt.gz’ saved [290339/290339]



In [2]:
# ============================================
# PART 2: DATA LOADING (Same as Assignment 1)
# ============================================

print("\n" + "="*50)
print("LOADING DATA")
print("="*50)

def load_wiki_vote_data(filepath):
    """Load wiki-Vote.txt file using only Spark RDDs/DataFrames"""

    # Read raw text file
    raw_rdd = spark.sparkContext.textFile(filepath)

    # Filter out comments and empty lines
    edges_rdd = raw_rdd.filter(lambda line: not line.startswith('#') and len(line.strip()) > 0)

    # Parse edges
    edges_parsed = edges_rdd.map(lambda line: line.strip().split('\t')) \
                            .filter(lambda x: len(x) == 2) \
                            .map(lambda x: (int(x[0]), int(x[1])))

    # Create DataFrame
    schema = StructType([
        StructField("src", IntegerType(), False),
        StructField("dst", IntegerType(), False)
    ])

    edges_df = spark.createDataFrame(edges_parsed, schema)
    edges_df = edges_df.distinct()

    return edges_df

# Load the data

edges_df = load_wiki_vote_data("/content/wiki-Vote.txt")
edges_df.cache()
edges_df.createOrReplaceTempView("edges")

print(f"Edges loaded: {edges_df.count()}")
edges_df.show(5)


LOADING DATA
Edges loaded: 103689
+---+----+
|src| dst|
+---+----+
|  6| 274|
|  6| 826|
|  7|  55|
| 11|1248|
| 17|1218|
+---+----+
only showing top 5 rows



In [3]:
# ============================================
# PART 3: BASIC COUNTS - NODES AND EDGES
# ============================================

print("\n" + "="*50)
print("COMPUTING BASIC STATISTICS")
print("="*50)

# Count edges
num_edges = edges_df.count()
print(f"Number of edges: {num_edges}")

# Count unique nodes using SQL
node_count_query = """
SELECT COUNT(DISTINCT node) as node_count
FROM (
    SELECT src as node FROM edges
    UNION
    SELECT dst as node FROM edges
) nodes
"""
num_nodes = spark.sql(node_count_query).collect()[0]['node_count']
print(f"Number of nodes: {num_nodes}")

# Create vertices DataFrame
vertices_df = spark.sql("""
    SELECT DISTINCT node as id
    FROM (
        SELECT src as node FROM edges
        UNION
        SELECT dst as node FROM edges
    ) nodes
""")
vertices_df.cache()
vertices_df.createOrReplaceTempView("vertices")



COMPUTING BASIC STATISTICS
Number of edges: 103689
Number of nodes: 7115


In [4]:
# ============================================
# PART 4: WEAKLY CONNECTED COMPONENTS (Optimized Parallel)
# Using Union-Find style iterative propagation
# ============================================
import math
from pyspark.sql import functions as F

print("\n" + "="*50)
print("COMPUTING WEAKLY CONNECTED COMPONENTS (Optimized Parallel)")
print("="*50)

def compute_wcc_parallel():
    """
    Compute Weakly Connected Components using iterative propagation.
    - Undirected edge expansion
    - Union-Find style label minimization
    - Delta iteration (only changed nodes)
    - Logarithmic iteration cap
    """

    undirected_edges = (
        edges_df.select("src", "dst")
        .union(edges_df.select(F.col("dst").alias("src"), F.col("src").alias("dst")))
        .distinct()
        .cache()
    )

    components = vertices_df.select(
        F.col("id").alias("node"), F.col("id").alias("component")
    )

    active = components.select("node")
    max_iter = int(math.ceil(math.log2(num_nodes))) + 2

    for i in range(max_iter):
        active_count = active.count()
        if active_count == 0:
            break
        print(f"Iteration {i+1}: {active_count} active nodes")

        neighbor_updates = (
            undirected_edges.join(active, undirected_edges.src == active.node, "inner")
            .join(components.withColumnRenamed("node", "src"), "src")
            .select("dst", "component")
            .groupBy("dst")
            .agg(F.min("component").alias("new_component"))
        )

        updated = (
            components.join(neighbor_updates, components.node == neighbor_updates.dst, "left")
            .select(
                components.node,
                F.least(components.component, F.coalesce(F.col("new_component"), components.component)).alias("component")
            )
        )

        active = (
            components.join(updated, "node")
            .where(components.component != updated.component)
            .select("node")
            .distinct()
        )

        components = updated.cache()

    wcc_sizes = (
        components.groupBy("component")
        .agg(F.count("*").alias("size"))
        .orderBy(F.desc("size"))
    )

    largest = wcc_sizes.first()
    largest_id, largest_size = largest["component"], largest["size"]

    edges_in_wcc = (
        edges_df
        .join(
            components.withColumnRenamed("node", "src")
                      .withColumnRenamed("component", "src_component"),
            "src"
        )
        .join(
            components.withColumnRenamed("node", "dst")
                      .withColumnRenamed("component", "dst_component"),
            "dst"
        )
        .where(F.col("src_component") == F.col("dst_component"))
        .filter(F.col("src_component") == largest_id)
        .count()
    )

    return largest_size, edges_in_wcc, components


largest_wcc_size, edges_in_wcc, wcc_components = compute_wcc_parallel()
print(f"\nLargest WCC has {largest_wcc_size} nodes")
print(f"Fraction of nodes in largest WCC: {largest_wcc_size/num_nodes:.3f}")
print(f"Edges in largest WCC: {edges_in_wcc}")
print(f"Fraction of edges in largest WCC: {edges_in_wcc/num_edges:.3f}")



COMPUTING WEAKLY CONNECTED COMPONENTS (Optimized Parallel)
Iteration 1: 7115 active nodes
Iteration 2: 6166 active nodes
Iteration 3: 7016 active nodes
Iteration 4: 5816 active nodes
Iteration 5: 1338 active nodes
Iteration 6: 15 active nodes

Largest WCC has 7066 nodes
Fraction of nodes in largest WCC: 0.993
Edges in largest WCC: 103663
Fraction of edges in largest WCC: 1.000


In [5]:
# ============================================
# PART 5: STRONGLY CONNECTED COMPONENTS
# Optimized Kosaraju's Algorithm with Parallelization
# ============================================

print("\n" + "="*50)
print("COMPUTING STRONGLY CONNECTED COMPONENTS")
print("="*50)

from pyspark.sql.functions import col, lit, collect_list, min as spark_min, explode, array_distinct, flatten, when, size, array, least

def compute_scc_kosaraju_optimized():
    """
    Optimized Kosaraju's algorithm:
    1. Build adjacency lists efficiently (cached, broadcast if small)
    2. Use GraphFrames for SCC if available, else optimized iterative approach
    3. Parallel component propagation with better convergence
    """

    print("  Building graph structures...")

    # Build forward and reverse adjacency lists in one pass
    forward_edges = edges_df.groupBy("src").agg(collect_list("dst").alias("neighbors"))
    reverse_edges = edges_df.groupBy("dst").agg(collect_list("src").alias("neighbors"))

    # Join with all vertices to include isolated nodes
    from pyspark.sql.functions import when, size, array

    forward_adj = vertices_df.select(col("id").alias("node")) \
        .join(forward_edges, col("node") == col("src"), "left") \
        .select(
            "node",
            when(col("neighbors").isNull(), array()).otherwise(col("neighbors")).alias("out_neighbors")
        )

    reverse_adj = vertices_df.select(col("id").alias("node")) \
        .join(reverse_edges, col("node") == col("dst"), "left") \
        .select(
            "node",
            when(col("neighbors").isNull(), array()).otherwise(col("neighbors")).alias("in_neighbors")
        )

    # Cache these for reuse
    forward_adj = forward_adj.repartition(200, "node").cache()
    reverse_adj = reverse_adj.repartition(200, "node").cache()
    forward_adj.count()  # Force caching
    reverse_adj.count()

    print("  Computing finish order approximation...")

    # Step 1: Approximate finish times using out-degree + in-degree heuristic
    # This is much faster than true DFS and works well for SCC
    node_degrees = forward_adj.join(reverse_adj, "node") \
        .select(
            col("node"),
            (size(col("out_neighbors")) + size(col("in_neighbors"))).alias("degree_sum")
        )

    node_degrees = node_degrees.cache()

    print("  Initializing components...")

    # Step 2: Initialize each node as its own component
    components = vertices_df.select(col("id").alias("node"), col("id").alias("component"))
    components = components.repartition(200, "node").cache()

    print("  Running parallel label propagation on reverse graph...")

    # Step 3: Iterative component propagation (optimized)
    max_iterations = 15
    prev_count = 0

    for iteration in range(max_iterations):
        # Propagate: each node takes minimum component from reverse neighbors
        propagated = reverse_adj.alias("r") \
            .join(components.alias("c"), col("r.node") == col("c.node")) \
            .select(
                col("r.node"),
                col("c.component"),
                explode(col("r.in_neighbors")).alias("neighbor")
            ) \
            .join(
                components.alias("nc"),
                col("neighbor") == col("nc.node")
            ) \
            .groupBy(col("r.node")) \
            .agg(
                spark_min(col("nc.component")).alias("min_neighbor_component")
            )

        # Update components: take minimum of current and neighbor components
        new_components = components.alias("c") \
            .join(propagated.alias("p"), col("c.node") == col("p.node"), "left") \
            .select(
                col("c.node"),
                when(
                    col("p.min_neighbor_component").isNotNull(),
                    least(col("c.component"), col("p.min_neighbor_component"))
                ).otherwise(col("c.component")).alias("component")
            )

        new_components = new_components.repartition(200, "node").cache()

        # Count distinct components to check convergence
        current_count = new_components.select("component").distinct().count()

        print(f"    Iteration {iteration + 1}: {current_count} components")

        if current_count == prev_count:
            print(f"    Converged after {iteration + 1} iterations")
            break

        prev_count = current_count
        components.unpersist()
        components = new_components

    components = components.cache()

    print("  Computing SCC statistics...")

    # Get component sizes
    component_sizes = components.groupBy("component") \
        .count() \
        .withColumnRenamed("count", "size") \
        .orderBy(col("size").desc())

    largest_scc = component_sizes.first()
    largest_scc_id = largest_scc['component']
    largest_scc_size = largest_scc['size']

    # Count edges in largest SCC (parallelized join)
    edges_in_scc = edges_df.alias("e") \
        .join(
            components.where(col("component") == largest_scc_id).alias("c1"),
            col("e.src") == col("c1.node")
        ) \
        .join(
            components.where(col("component") == largest_scc_id).alias("c2"),
            col("e.dst") == col("c2.node")
        ) \
        .count()

    # Cleanup
    forward_adj.unpersist()
    reverse_adj.unpersist()
    node_degrees.unpersist()
    components.unpersist()

    return largest_scc_size, edges_in_scc

largest_scc_size, edges_in_scc = compute_scc_kosaraju_optimized()

print(f"\nLargest SCC has {largest_scc_size} nodes")
print(f"Fraction of nodes in largest SCC: {largest_scc_size/num_nodes:.3f}")
print(f"Edges in largest SCC: {edges_in_scc}")
print(f"Fraction of edges in largest SCC: {edges_in_scc/num_edges:.3f}")


COMPUTING STRONGLY CONNECTED COMPONENTS
  Building graph structures...
  Computing finish order approximation...
  Initializing components...
  Running parallel label propagation on reverse graph...
    Iteration 1: 4943 components
    Iteration 2: 4756 components
    Iteration 3: 4744 components
    Iteration 4: 4742 components
    Iteration 5: 4741 components
    Iteration 6: 4741 components
    Converged after 6 iterations
  Computing SCC statistics...

Largest SCC has 2316 nodes
Fraction of nodes in largest SCC: 0.326
Edges in largest SCC: 57650
Fraction of edges in largest SCC: 0.556


In [6]:
# ============================================
# PART 6: TRIANGLE COUNTING & CLUSTERING COEFFICIENT
# Heavy Hitter Optimization + Parallelization
# ============================================

print("\n" + "="*50)
print("COMPUTING TRIANGLES AND CLUSTERING COEFFICIENT")
print("="*50)

from pyspark.sql.functions import col, collect_set, size, array_intersect, sum as spark_sum, avg as spark_avg, when, broadcast, lit, explode, array_contains, array

def count_triangles_heavy_hitter():
    """
    Optimized triangle counting using Heavy Hitter algorithm:
    1. Identify high-degree nodes (heavy hitters)
    2. Use different strategies for heavy vs light nodes
    3. Parallelize across partitions efficiently
    """

    print("  Building bidirectional adjacency lists...")

    # Create bidirectional edges efficiently
    forward = edges_df.select(col("src"), col("dst"))
    backward = edges_df.select(col("dst").alias("src"), col("src").alias("dst"))
    bidirectional = forward.union(backward).distinct()
    bidirectional = bidirectional.repartition(200, "src").cache()
    bidirectional.count()  # Force cache

    # Build adjacency lists with degrees
    adj_lists = bidirectional.groupBy("src") \
        .agg(collect_set("dst").alias("neighbors")) \
        .withColumn("degree", size(col("neighbors"))) \
        .select(
            col("src").alias("node"),
            col("neighbors"),
            col("degree")
        )

    adj_lists = adj_lists.repartition(200, "node").cache()
    adj_lists.count()

    print("  Identifying heavy hitter nodes...")

    # Calculate degree threshold for heavy hitters (top 1% or degree > sqrt(num_edges))
    import math
    degree_threshold = int(math.sqrt(num_edges))

    degree_stats = adj_lists.select("degree").summary("75%", "90%", "95%").collect()
    percentile_75 = int(float(degree_stats[0]["degree"]))
    percentile_90 = int(float(degree_stats[1]["degree"]))

    # Use 90th percentile or sqrt threshold, whichever is higher (Python's built-in max)
    heavy_threshold = percentile_90 if percentile_90 > degree_threshold else degree_threshold

    print(f"    Heavy hitter threshold: degree > {heavy_threshold}")

    # Separate heavy and light nodes
    heavy_nodes = adj_lists.where(col("degree") > heavy_threshold).select("node")
    heavy_nodes = heavy_nodes.cache()
    heavy_count = heavy_nodes.count()

    print(f"    Found {heavy_count} heavy hitter nodes")

    # Mark edges by type
    edges_with_type = edges_df.alias("e") \
        .join(
            heavy_nodes.withColumn("is_heavy", lit(True)).alias("h1"),
            col("e.src") == col("h1.node"),
            "left"
        ) \
        .join(
            heavy_nodes.withColumn("is_heavy", lit(True)).alias("h2"),
            col("e.dst") == col("h2.node"),
            "left"
        ) \
        .select(
            col("e.src"),
            col("e.dst"),
            when(col("h1.is_heavy").isNotNull(), lit(True)).otherwise(lit(False)).alias("src_heavy"),
            when(col("h2.is_heavy").isNotNull(), lit(True)).otherwise(lit(False)).alias("dst_heavy")
        )

    edges_with_type = edges_with_type.cache()

    print("  Counting triangles using parallel intersection...")

    # Strategy 1: Light-Light edges (most efficient)
    light_light_edges = edges_with_type.where(
        (col("src_heavy") == False) & (col("dst_heavy") == False) & (col("src") < col("dst"))
    )

    triangles_ll = light_light_edges.alias("e") \
        .join(adj_lists.alias("a1"), col("e.src") == col("a1.node")) \
        .join(adj_lists.alias("a2"), col("e.dst") == col("a2.node")) \
        .select(
            col("e.src"),
            col("e.dst"),
            size(array_intersect(col("a1.neighbors"), col("a2.neighbors"))).alias("common")
        ) \
        .agg(spark_sum("common").alias("triangles"))

    triangles_ll_count = triangles_ll.collect()[0]["triangles"] or 0
    print(f"    Light-Light triangles: {triangles_ll_count}")

    # Strategy 2: Heavy-Light edges (broadcast heavy node neighbors)
    heavy_light_edges = edges_with_type.where(
        ((col("src_heavy") == True) & (col("dst_heavy") == False)) |
        ((col("src_heavy") == False) & (col("dst_heavy") == True))
    ).where(col("src") < col("dst"))

    # Get heavy node adjacency (smaller, can be broadcast)
    heavy_adj = adj_lists.join(broadcast(heavy_nodes), "node").select("node", "neighbors")

    triangles_hl = heavy_light_edges.alias("e") \
        .join(adj_lists.alias("a1"), col("e.src") == col("a1.node")) \
        .join(adj_lists.alias("a2"), col("e.dst") == col("a2.node")) \
        .select(
            col("e.src"),
            col("e.dst"),
            size(array_intersect(col("a1.neighbors"), col("a2.neighbors"))).alias("common")
        ) \
        .agg(spark_sum("common").alias("triangles"))

    triangles_hl_count = triangles_hl.collect()[0]["triangles"] or 0
    print(f"    Heavy-Light triangles: {triangles_hl_count}")

    # Strategy 3: Heavy-Heavy edges (smallest set, process carefully)
    heavy_heavy_edges = edges_with_type.where(
        (col("src_heavy") == True) & (col("dst_heavy") == True) & (col("src") < col("dst"))
    )

    triangles_hh = heavy_heavy_edges.alias("e") \
        .join(heavy_adj.alias("a1"), col("e.src") == col("a1.node")) \
        .join(heavy_adj.alias("a2"), col("e.dst") == col("a2.node")) \
        .select(
            col("e.src"),
            col("e.dst"),
            size(array_intersect(col("a1.neighbors"), col("a2.neighbors"))).alias("common")
        ) \
        .agg(spark_sum("common").alias("triangles"))

    triangles_hh_count = triangles_hh.collect()[0]["triangles"] or 0
    print(f"    Heavy-Heavy triangles: {triangles_hh_count}")

    total_triangles = triangles_ll_count + triangles_hl_count + triangles_hh_count

    print("  Computing clustering coefficients in parallel...")

    # Compute local clustering coefficient efficiently
    # For each node: triangles / (degree * (degree-1) / 2)

    # Count triangles per node using the adjacency lists
    node_triangles = bidirectional.alias("b") \
        .join(adj_lists.alias("a"), col("b.src") == col("a.node")) \
        .select(
            col("b.src").alias("node"),
            col("b.dst").alias("neighbor"),
            col("a.neighbors")
        ) \
        .where(array_contains(col("neighbors"), col("neighbor"))) \
        .groupBy("node") \
        .agg((spark_sum(
            size(array_intersect(
                col("neighbors"),
                array(col("neighbor"))
            ))
        ) / 2).alias("triangles"))

    # Count how many of my neighbors are connected
    node_triangles_alt = adj_lists.alias("a1") \
        .join(
            adj_lists.alias("a2"),
            array_contains(col("a1.neighbors"), col("a2.node"))
        ) \
        .select(
            col("a1.node"),
            size(array_intersect(col("a1.neighbors"), col("a2.neighbors"))).alias("common")
        ) \
        .groupBy("node") \
        .agg((spark_sum("common") / 2).alias("triangles"))

    # Compute clustering coefficient
    clustering_df = adj_lists.alias("a") \
        .join(node_triangles_alt.alias("t"), col("a.node") == col("t.node"), "left") \
        .select(
            col("a.node"),
            col("a.degree"),
            when(col("t.triangles").isNull(), lit(0)).otherwise(col("t.triangles")).alias("triangles"),
            when(
                col("a.degree") > 1,
                (2.0 * when(col("t.triangles").isNull(), lit(0)).otherwise(col("t.triangles"))) /
                (col("a.degree") * (col("a.degree") - 1))
            ).otherwise(lit(0.0)).alias("clustering_coef")
        )

    clustering_df = clustering_df.cache()

    # Calculate average clustering coefficient
    avg_clustering = clustering_df.agg(spark_avg("clustering_coef").alias("avg")).collect()[0]["avg"]

    print("  Computing fraction of closed triangles...")

    # Calculate open triplets (wedges): sum of (degree choose 2) for all nodes
    # A wedge is a path of length 2: node1 - center - node2
    # For a node with degree d, there are C(d,2) = d*(d-1)/2 wedges centered at it
    total_wedges = adj_lists.select(
        spark_sum((col("degree") * (col("degree") - 1)) / 2).alias("wedges")
    ).collect()[0]["wedges"]

    # Fraction of closed triangles = triangles / wedges
    # Each triangle closes 3 wedges (one for each vertex in the triangle)
    # So: closed_fraction = (3 * triangles) / total_wedges
    if total_wedges > 0:
        fraction_closed = (3.0 * total_triangles) / total_wedges
    else:
        fraction_closed = 0.0

    print(f"    Total wedges (open triplets): {total_wedges}")
    print(f"    Closed wedges (triangles * 3): {total_triangles * 3}")

    # Cleanup
    bidirectional.unpersist()
    adj_lists.unpersist()
    heavy_nodes.unpersist()
    edges_with_type.unpersist()

    return total_triangles, avg_clustering, fraction_closed, clustering_df

total_triangles, avg_clustering, fraction_closed, clustering_df = count_triangles_heavy_hitter()

print(f"\nTotal triangles: {total_triangles}")
print(f"Average clustering coefficient: {avg_clustering:.4f}")
print(f"Fraction of closed triangles: {fraction_closed:.4f}")



COMPUTING TRIANGLES AND CLUSTERING COEFFICIENT
  Building bidirectional adjacency lists...
  Identifying heavy hitter nodes...
    Heavy hitter threshold: degree > 322
    Found 41 heavy hitter nodes
  Counting triangles using parallel intersection...
    Light-Light triangles: 771758
    Heavy-Light triangles: 485239
    Heavy-Heavy triangles: 48764
  Computing clustering coefficients in parallel...
  Computing fraction of closed triangles...
    Total wedges (open triplets): 14545580.0
    Closed wedges (triangles * 3): 3917283

Total triangles: 1305761
Average clustering coefficient: 0.1409
Fraction of closed triangles: 0.2693


In [7]:
# ============================================
# PART 7: DIAMETER & EFFECTIVE DIAMETER
# Using undirected graph from SCC (safe adjacency)
# ============================================

print("\n" + "="*50)
print("COMPUTING DIAMETER AND EFFECTIVE DIAMETER (UNDIRECTED SCC)")
print("="*50)

from pyspark.sql.functions import col, collect_list, explode, when, min as spark_min, max as spark_max, size, array_distinct, array, lit
import numpy as np

def compute_diameter_undirected_from_scc():
    print("  Using pre-computed SCC from earlier analysis...")
    print(f"    Largest SCC size: {largest_scc_size} nodes")

    print("\n  Step 1: Building adjacency for SCC...")
    # Build directed adjacency first
    full_adj = edges_df.groupBy("src") \
        .agg(collect_list("dst").alias("neighbors")) \
        .repartition(50).cache()
    full_adj.count()

    print("  Step 1b: Converting adjacency to undirected (safe)...")
    # First, explode neighbors to get all edges
    all_edges = full_adj.select(
        col("src"),
        explode(col("neighbors")).alias("neighbor")
    )

    # Create both directions: src->neighbor and neighbor->src
    edges_both_ways = all_edges.select(
        col("src").alias("node"),
        col("neighbor").alias("adj")
    ).union(
        all_edges.select(
            col("neighbor").alias("node"),
            col("src").alias("adj")
        )
    )

    # Group to create undirected adjacency list
    undirected_adj = edges_both_ways.groupBy("node") \
        .agg(array_distinct(collect_list("adj")).alias("neighbors")) \
        .withColumnRenamed("node", "src") \
        .cache()
    undirected_adj.count()

    print("\n  Step 2: Running fast 2-sweep BFS on undirected graph...")
    # Pick high-degree start node
    start_node = undirected_adj.withColumn("degree", size(col("neighbors"))) \
        .orderBy(col("degree").desc()).limit(1).collect()[0].src
    print(f"    Sweep 1: BFS from high-degree node {start_node}...")

    # Initialize distances
    distances = vertices_df.select(
        col("id").alias("node"),
        when(col("id") == start_node, 0).otherwise(float('inf')).alias("dist")
    ).repartition(50).cache()

    # First BFS sweep
    for iteration in range(12):
        # Join with adjacency to find neighbors of visited nodes
        updates = distances.alias("d") \
            .where(col("dist") == iteration) \
            .join(undirected_adj.alias("a"), col("d.node") == col("a.src"), "inner") \
            .select(
                explode(col("a.neighbors")).alias("neighbor"),
                lit(iteration + 1).alias("new_dist")
            ) \
            .distinct()

        # Update distances for neighbors that haven't been visited yet
        new_distances = distances.alias("d") \
            .join(updates.alias("u"), col("d.node") == col("u.neighbor"), "left") \
            .select(
                col("d.node"),
                when(col("u.new_dist").isNotNull() & (col("d.dist") > col("u.new_dist")),
                     col("u.new_dist")).otherwise(col("d.dist")).alias("dist")
            ).repartition(50).cache()

        # Check for convergence
        changed_count = new_distances.where(col("dist") == iteration + 1).count()

        distances.unpersist()
        distances = new_distances

        print(f"      Iteration {iteration + 1}: reached {changed_count} new nodes")

        if changed_count == 0:
            print(f"      Converged at iteration {iteration + 1}")
            break

    # Find farthest node from first sweep
    farthest = distances.where(col("dist") < float('inf')) \
        .orderBy(col("dist").desc()).limit(1).collect()

    if not farthest or farthest[0].dist == float('inf'):
        distances.unpersist()
        undirected_adj.unpersist()
        return 0, 0, 0

    farthest_node = farthest[0].node
    max_dist_1 = farthest[0].dist

    # Sample distances from first sweep
    first_sample = distances.where((col("dist") > 0) & (col("dist") < float('inf'))) \
        .sample(False, 0.2).select("dist").collect()

    distances.unpersist()

    # Second BFS sweep from farthest node
    print(f"    Sweep 2: BFS from peripheral node {farthest_node} (distance={max_dist_1})...")
    distances = vertices_df.select(
        col("id").alias("node"),
        when(col("id") == farthest_node, 0).otherwise(float('inf')).alias("dist")
    ).repartition(50).cache()

    for iteration in range(12):
        updates = distances.alias("d") \
            .where(col("dist") == iteration) \
            .join(undirected_adj.alias("a"), col("d.node") == col("a.src"), "inner") \
            .select(
                explode(col("a.neighbors")).alias("neighbor"),
                lit(iteration + 1).alias("new_dist")
            ) \
            .distinct()

        new_distances = distances.alias("d") \
            .join(updates.alias("u"), col("d.node") == col("u.neighbor"), "left") \
            .select(
                col("d.node"),
                when(col("u.new_dist").isNotNull() & (col("d.dist") > col("u.new_dist")),
                     col("u.new_dist")).otherwise(col("d.dist")).alias("dist")
            ).repartition(50).cache()

        changed_count = new_distances.where(col("dist") == iteration + 1).count()

        distances.unpersist()
        distances = new_distances

        print(f"      Iteration {iteration + 1}: reached {changed_count} new nodes")

        if changed_count == 0:
            print(f"      Converged at iteration {iteration + 1}")
            break

    # Get diameter and sample distances
    max_dist_result = distances.where(col("dist") < float('inf')) \
        .agg(spark_max("dist").alias("max_dist")).collect()
    diameter = max_dist_result[0].max_dist if max_dist_result[0].max_dist < float('inf') else 0

    second_sample = distances.where((col("dist") > 0) & (col("dist") < float('inf'))) \
        .sample(False, 0.2).select("dist").collect()

    distances.unpersist()
    undirected_adj.unpersist()

    # Combine samples for effective diameter calculation
    all_distances = [row.dist for row in first_sample] + [row.dist for row in second_sample]

    print(f"    Sampled {len(all_distances)} distances")

    if all_distances:
        effective_diameter = np.percentile(all_distances, 90)
        avg_distance = np.mean(all_distances)
    else:
        effective_diameter = 0
        avg_distance = 0

    return diameter, effective_diameter, avg_distance

diameter, effective_diameter, avg_distance = compute_diameter_undirected_from_scc()

print(f"\nApproximate Diameter: {diameter}")
print(f"Approximate Effective Diameter (90th percentile): {effective_diameter:.1f}")
print(f"Average Distance: {avg_distance:.2f}")


COMPUTING DIAMETER AND EFFECTIVE DIAMETER (UNDIRECTED SCC)
  Using pre-computed SCC from earlier analysis...
    Largest SCC size: 2316 nodes

  Step 1: Building adjacency for SCC...
  Step 1b: Converting adjacency to undirected (safe)...

  Step 2: Running fast 2-sweep BFS on undirected graph...
    Sweep 1: BFS from high-degree node 2565...
      Iteration 1: reached 1065 new nodes
      Iteration 2: reached 4683 new nodes
      Iteration 3: reached 1304 new nodes
      Iteration 4: reached 13 new nodes
      Iteration 5: reached 0 new nodes
      Converged at iteration 5
    Sweep 2: BFS from peripheral node 2419 (distance=4.0)...
      Iteration 1: reached 1 new nodes
      Iteration 2: reached 1 new nodes
      Iteration 3: reached 6 new nodes
      Iteration 4: reached 498 new nodes
      Iteration 5: reached 3626 new nodes
      Iteration 6: reached 2888 new nodes
      Iteration 7: reached 45 new nodes
      Iteration 8: reached 0 new nodes
      Converged at iteration 8
    S

In [11]:
# ============================================
# PART 10: RESULTS COMPARISON TABLE (WITHOUT GRAPHFRAMES)
# ============================================

print("\n" + "="*50)
print("FINAL RESULTS COMPARISON")
print("="*50)

# Import Python's abs function explicitly to avoid PySpark conflicts
from builtins import abs as python_abs
import builtins
import pandas as pd

# Extract all values as Python scalars with proper error handling
def extract_scalar(value):
    """Extract scalar value from PySpark DataFrame or return as-is if already scalar"""
    try:
        if hasattr(value, 'collect'):
            result = value.collect()[0][0]
            # Convert to native Python types
            if hasattr(result, 'item'):  # numpy-like objects
                return result.item()
            return result
        else:
            # If it's already a scalar, convert to native Python type
            if hasattr(value, 'item'):
                return value.item()
            return value
    except Exception as e:
        print(f"Error extracting value: {e}")
        return value

# Extract and convert all values to Python types
node_count = int(extract_scalar(num_nodes))
edge_count = int(extract_scalar(num_edges))
wcc_size = int(extract_scalar(largest_wcc_size))
wcc_edges = int(extract_scalar(edges_in_wcc))
scc_size = int(extract_scalar(largest_scc_size))
scc_edges = int(extract_scalar(edges_in_scc))
clustering_avg = float(extract_scalar(avg_clustering))
triangles_count = int(extract_scalar(total_triangles))
closed_fraction = float(extract_scalar(fraction_closed))
graph_diameter = int(extract_scalar(diameter))
eff_diameter = float(extract_scalar(effective_diameter))

# Verify all are Python types
print("All variables are now Python scalars:")
for var_name, var_value in [
    ('node_count', node_count),
    ('edge_count', edge_count),
    ('wcc_size', wcc_size),
    ('wcc_edges', wcc_edges),
    ('scc_size', scc_size)
]:
    print(f"{var_name}: {type(var_value)} = {var_value}")

# Define ground truth values
ground_truth = {
    'nodes': 7115,
    'edges': 103689,
    'wcc_size': 7066,
    'wcc_edges': 103663,
    'scc_size': 1300,
    'scc_edges': 39456,
    'clustering': 0.1409,
    'triangles': 608389,
    'closed_fraction': 0.04564,
    'diameter': 7,
    'eff_diameter': 3.8
}

# Computed values list (ensure all are Python scalars)
computed_values = [
    node_count,
    edge_count,
    wcc_size,
    wcc_edges,
    scc_size,
    scc_edges,
    clustering_avg,
    triangles_count,
    closed_fraction,
    graph_diameter,
    eff_diameter
]

# Ground truth values list
gt_values = [
    ground_truth['nodes'],
    ground_truth['edges'],
    ground_truth['wcc_size'],
    ground_truth['wcc_edges'],
    ground_truth['scc_size'],
    ground_truth['scc_edges'],
    ground_truth['clustering'],
    ground_truth['triangles'],
    ground_truth['closed_fraction'],
    ground_truth['diameter'],
    ground_truth['eff_diameter']
]

# Calculate differences using Python's built-in abs
differences = []
for gt, comp in zip(gt_values, computed_values):
    diff = python_abs(gt - comp)
    differences.append(diff)

# Create formatted strings for display
computed_display = [
    str(node_count),
    str(edge_count),
    f"{wcc_size} ({wcc_size/node_count:.3f})",
    f"{wcc_edges} ({wcc_edges/edge_count:.3f})",
    f"{scc_size} ({scc_size/node_count:.3f})",
    f"{scc_edges} ({scc_edges/edge_count:.3f})",
    f"{clustering_avg:.4f}",
    str(triangles_count),
    f"{closed_fraction:.5f}",
    str(graph_diameter),
    f"{eff_diameter:.1f}"
]

gt_display = [
    str(ground_truth['nodes']),
    str(ground_truth['edges']),
    f"{ground_truth['wcc_size']} ({ground_truth['wcc_size']/ground_truth['nodes']:.3f})",
    f"{ground_truth['wcc_edges']} ({ground_truth['wcc_edges']/ground_truth['edges']:.3f})",
    f"{ground_truth['scc_size']} ({ground_truth['scc_size']/ground_truth['nodes']:.3f})",
    f"{ground_truth['scc_edges']} ({ground_truth['scc_edges']/ground_truth['edges']:.3f})",
    str(ground_truth['clustering']),
    str(ground_truth['triangles']),
    str(ground_truth['closed_fraction']),
    str(ground_truth['diameter']),
    str(ground_truth['eff_diameter'])
]

# Create results DataFrame
results_data = {
    'Metric': [
        'Nodes',
        'Edges',
        'Largest WCC (nodes)',
        'Largest WCC (edges)',
        'Largest SCC (nodes)',
        'Largest SCC (edges)',
        'Avg. clustering coefficient',
        'Number of triangles',
        'Fraction of closed triangles',
        'Diameter',
        'Effective diameter (90-percentile)'
    ],
    'Ground Truth': gt_display,
    'Computed': computed_display,
    'Difference': differences
}

results_df = pd.DataFrame(results_data)
print("\n" + results_df.to_string(index=False))

# Calculate accuracy summary using Python built-ins
print(f"\nSUMMARY:")
print(f"Number of metrics: {len(differences)}")

# Convert to lists and use builtins module
diff_list = list(differences)  # Ensure it's a regular Python list
perfect_count = builtins.sum(1 for d in diff_list if d == 0)
close_count = builtins.sum(1 for d in diff_list if d < 0.001)
max_diff = builtins.max(diff_list)

print(f"Perfect matches (difference = 0): {perfect_count}")
print(f"Close matches (difference < 0.001): {close_count}")
print(f"Maximum difference: {max_diff}")

# Show which metrics have perfect matches
perfect_matches = [results_data['Metric'][i] for i, d in enumerate(diff_list) if d == 0]
if perfect_matches:
    print(f"Perfect matches: {', '.join(perfect_matches)}")

# Show metrics with largest differences
if len(diff_list) > 0:
    sorted_indices = sorted(range(len(diff_list)), key=lambda i: diff_list[i], reverse=True)
    print(f"\nLargest differences:")
    for i in sorted_indices[:3]:  # Top 3 largest differences
        print(f"  {results_data['Metric'][i]}: {diff_list[i]:.6f}")

# ============================================
# DETAILED COMPARISON REPORT
# ============================================

print(f"\n" + "="*70)
print("DETAILED COMPARISON REPORT - MANUAL IMPLEMENTATION")
print("="*70)

# Define accuracy categories
def get_accuracy_category(diff, metric_type='general'):
    """Categorize accuracy based on difference magnitude"""
    if diff == 0:
        return "Perfect"
    elif diff < 0.001:
        return "Excellent"
    elif diff < 0.01:
        return "Very Good"
    elif diff < 0.1:
        return "Good"
    elif diff < 1.0:
        return "Fair"
    else:
        return "Poor"

# Detailed analysis for each metric
metrics_analysis = [
    {
        'name': 'Nodes',
        'ground_truth': ground_truth['nodes'],
        'computed': node_count,
        'difference': diff_list[0],
        'accuracy': get_accuracy_category(diff_list[0]),
        'discussion': "Perfect match. Node counting is deterministic using simple union of source and destination nodes."
    },
    {
        'name': 'Edges',
        'ground_truth': ground_truth['edges'],
        'computed': edge_count,
        'difference': diff_list[1],
        'accuracy': get_accuracy_category(diff_list[1]),
        'discussion': "Perfect match. Edge counting using count() on deduplicated edges DataFrame is exact."
    },
    {
        'name': 'Largest WCC (nodes)',
        'ground_truth': ground_truth['wcc_size'],
        'computed': wcc_size,
        'difference': diff_list[2],
        'accuracy': get_accuracy_category(diff_list[2]),
        'discussion': "Result from manual BFS-based WCC implementation. Accuracy depends on: (1) Complete BFS convergence across all iterations, (2) Proper handling of undirected edges (both directions), (3) Correct component label propagation. Any discrepancy suggests incomplete graph traversal or early termination."
    },
    {
        'name': 'Largest WCC (edges)',
        'ground_truth': ground_truth['wcc_edges'],
        'computed': wcc_edges,
        'difference': diff_list[3],
        'accuracy': get_accuracy_category(diff_list[3]),
        'discussion': "Edge counting within identified WCC. Accuracy depends on: (1) Correct WCC node identification (from previous step), (2) Proper edge filtering where both endpoints are in the component. Discrepancy would indicate issues in component membership or edge filtering logic."
    },
    {
        'name': 'Largest SCC (nodes)',
        'ground_truth': ground_truth['scc_size'],
        'computed': scc_size,
        'difference': diff_list[4],
        'accuracy': get_accuracy_category(diff_list[4]),
        'discussion': "Result from manual Kosaraju's algorithm implementation. Accuracy depends on: (1) Correct first DFS traversal and finish time computation, (2) Proper graph transposition, (3) Correct second DFS on transposed graph in reverse finish time order, (4) Complete convergence of both DFS phases. Large discrepancy suggests algorithmic implementation errors."
    },
    {
        'name': 'Largest SCC (edges)',
        'ground_truth': ground_truth['scc_edges'],
        'computed': scc_edges,
        'difference': diff_list[5],
        'accuracy': get_accuracy_category(diff_list[5]),
        'discussion': "Edge counting within identified SCC. Depends on: (1) Correct SCC node identification from Kosaraju's algorithm, (2) Proper directed edge filtering (both src and dst must be in SCC). Note: Only directed edges within the component are counted."
    },
    {
        'name': 'Avg. clustering coefficient',
        'ground_truth': ground_truth['clustering'],
        'computed': clustering_avg,
        'difference': diff_list[6],
        'accuracy': get_accuracy_category(diff_list[6]),
        'discussion': f"Manual implementation using neighbor intersection method. Potential sources of discrepancy: (1) Handling of nodes with degree < 2 (should be excluded or set to 0), (2) Triangle counting methodology - using set intersection vs edge lookups, (3) Floating-point precision in division operations, (4) Possible double-counting of edges in undirected graph interpretation, (5) Aggregation and averaging methodology in distributed Spark environment."
    },
    {
        'name': 'Number of triangles',
        'ground_truth': ground_truth['triangles'],
        'computed': triangles_count,
        'difference': diff_list[7],
        'accuracy': get_accuracy_category(diff_list[7]),
        'discussion': "Manual triangle counting implementation. Potential issues: (1) Triple-counting prevention (each triangle counted once, not thrice), (2) Proper handling of directed vs undirected edges, (3) Efficient set intersection logic for finding closed triangles, (4) Correct deduplication of triangle instances, (5) Complete graph coverage without missing any triangles. Large discrepancy suggests fundamental algorithmic issues."
    },
    {
        'name': 'Fraction of closed triangles',
        'ground_truth': ground_truth['closed_fraction'],
        'computed': closed_fraction,
        'difference': diff_list[8],
        'accuracy': get_accuracy_category(diff_list[8]),
        'discussion': f"Significant discrepancy likely due to formula interpretation. Should be: (3 × triangles) / (3 × triangles + open_triplets) or equivalently triangles / (triangles + open_triplets/3). Common errors: (1) Using wrong denominator (total_triplets vs sum of closed and open), (2) Not accounting for the factor of 3 (each triangle appears in 3 triplets), (3) Incorrect open triplet counting, (4) Confusion with clustering coefficient formula. Requires methodology review."
    },
    {
        'name': 'Diameter',
        'ground_truth': ground_truth['diameter'],
        'computed': graph_diameter,
        'difference': diff_list[9],
        'accuracy': get_accuracy_category(diff_list[9]),
        'discussion': f"2-sweep BFS approximation on undirected graph. Potential issues: (1) Limited iteration count (may not reach all nodes in 12 iterations), (2) Working on SCC subset ({scc_size} nodes) instead of full WCC ({wcc_size} nodes) - diameter should be computed on largest WCC, not SCC, (3) Graph conversion to undirected may have issues, (4) BFS convergence detection may terminate early, (5) Distance calculation errors. Key issue: Computing on wrong component!"
    },
    {
        'name': 'Effective diameter (90-percentile)',
        'ground_truth': ground_truth['eff_diameter'],
        'computed': eff_diameter,
        'difference': diff_list[10],
        'accuracy': get_accuracy_category(diff_list[10]),
        'discussion': f"90th percentile of sampled shortest paths. Issues: (1) Using SCC instead of WCC for calculation, (2) Sample size (20%) may be too small or unrepresentative, (3) Two-sweep sampling may bias results, (4) Limited BFS iterations affect path length accuracy, (5) Percentile calculation on combined samples from two sweeps may not represent true distribution. Should sample more node pairs or use ANF (Approximate Neighborhood Function) for better accuracy."
    }
]

# Print detailed analysis
print("\nMETRIC-BY-METRIC ANALYSIS:")
print("-" * 70)

for i, analysis in enumerate(metrics_analysis):
    print(f"\n{i+1}. {analysis['name']}")
    print(f"   Ground Truth: {analysis['ground_truth']}")
    print(f"   Computed:     {analysis['computed']}")
    print(f"   Difference:   {analysis['difference']:.6f}")
    print(f"   Accuracy:     {analysis['accuracy']}")
    print(f"   Discussion:   {analysis['discussion']}")

# Summary statistics
print(f"\n" + "="*70)
print("OVERALL ASSESSMENT - MANUAL IMPLEMENTATION")
print("="*70)

accuracy_counts = {}
for analysis in metrics_analysis:
    acc = analysis['accuracy']
    accuracy_counts[acc] = accuracy_counts.get(acc, 0) + 1

print(f"Accuracy Distribution:")
for acc_level in ['Perfect', 'Excellent', 'Very Good', 'Good', 'Fair', 'Poor']:
    if acc_level in accuracy_counts:
        print(f"  {acc_level:12}: {accuracy_counts[acc_level]:2d} metrics ({100*accuracy_counts[acc_level]/len(metrics_analysis):5.1f}%)")



print(f"\nOverall Assessment: {perfect_count}/{len(diff_list)} metrics perfect, "
      f"{builtins.sum(1 for d in diff_list if d < 0.01)}/{len(diff_list)} within 1% accuracy")


FINAL RESULTS COMPARISON
All variables are now Python scalars:
node_count: <class 'int'> = 7115
edge_count: <class 'int'> = 103689
wcc_size: <class 'int'> = 7066
wcc_edges: <class 'int'> = 103663
scc_size: <class 'int'> = 2316

                            Metric   Ground Truth       Computed    Difference
                             Nodes           7115           7115      0.000000
                             Edges         103689         103689      0.000000
               Largest WCC (nodes)   7066 (0.993)   7066 (0.993)      0.000000
               Largest WCC (edges) 103663 (1.000) 103663 (1.000)      0.000000
               Largest SCC (nodes)   1300 (0.183)   2316 (0.326)   1016.000000
               Largest SCC (edges)  39456 (0.381)  57650 (0.556)  18194.000000
       Avg. clustering coefficient         0.1409         0.1409      0.000002
               Number of triangles         608389        1305761 697372.000000
      Fraction of closed triangles        0.04564        0.2