In [3]:
# =======  BDA Assignment-1 : Wikipedia Vote Graph ========


In [None]:
!pip install pyspark==3.5.0
!pip install graphframes

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, trim, when, explode, sum as spark_sum
from math import comb
from graphframes import GraphFrame
import matplotlib.pyplot as plt
import seaborn as sns

In [20]:
# spark session with GraphFrames
spark = SparkSession.builder.appName("WikiVoteAnalysis") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12") \
    .getOrCreate()

# set checkpoint directory needed for WCC / SCC
spark.sparkContext.setCheckpointDir("/tmp/graphframes-checkpoint")

In [21]:
# Download & Load Dataset
!wget -nc -O /content/wiki-Vote.txt.gz https://snap.stanford.edu/data/wiki-Vote.txt.gz
!gunzip -f /content/wiki-Vote.txt.gz



--2025-09-20 14:06:19--  https://snap.stanford.edu/data/wiki-Vote.txt.gz
Resolving snap.stanford.edu (snap.stanford.edu)... 171.64.75.80
Connecting to snap.stanford.edu (snap.stanford.edu)|171.64.75.80|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 290339 (284K) [application/x-gzip]
Saving to: ‘/content/wiki-Vote.txt.gz’


2025-09-20 14:06:20 (407 KB/s) - ‘/content/wiki-Vote.txt.gz’ saved [290339/290339]



In [29]:
raw = spark.read.text("/content/wiki-Vote.txt")

edges = (raw.filter(~col("value").startswith("#")).withColumn("parts", split(trim(col("value")), "\\s+"))
         .withColumn("src", col("parts").getItem(0).cast("int"))
         .withColumn("dst", col("parts").getItem(1).cast("int"))
         .select("src", "dst")
         .filter(col("src").isNotNull() & col("dst").isNotNull())
        )

# remove selfloop and duplicate
edges = edges.filter(col("src") != col("dst")).dropDuplicates()

# vertice
vertices = (edges.select(col("src").alias("id")).union(edges.select(col("dst").alias("id"))).distinct())

g = GraphFrame(vertices, edges)

# basic Graph Stat

num_nodes = vertices.count()
num_edges = edges.count()
print(f"Nodes: {num_nodes}, Edges: {num_edges}")



Nodes: 7115, Edges: 103689


In [32]:

# Step 3: weakly connected component (WCC)

wcc = g.connectedComponents()
wcc_sizes = wcc.groupBy("component").count().orderBy("count", ascending=False)
largest_wcc_nodes = wcc_sizes.first()["count"]

largest_comp = wcc_sizes.first()["component"]
wcc_edges = (wcc.join(edges, wcc.id == edges.src, "inner").filter(wcc.component == largest_comp))
largest_wcc_edges = wcc_edges.count()

print(f"Largest WCC: {largest_wcc_nodes} nodes, {largest_wcc_edges} edges")
# # Visualize WCC distribution
# wcc_sizes_pd = wcc_sizes.toPandas()
# plt.figure(figsize=(12, 6))
# sns.barplot(x=wcc_sizes_pd.index, y=wcc_sizes_pd['count'])
# plt.yscale('log')
# plt.title('Distribution of Weakly Connected Component Sizes', fontsize=16)
# plt.xlabel('Component Rank (Sorted by Size)', fontsize=12)
# plt.ylabel('Number of Nodes (Log Scale)', fontsize=12)
# plt.xticks(rotation=45)
# plt.tight_layout()




Largest WCC: 7066 nodes, 103663 edges


In [24]:

# Step 4: strongly connected components (SCC)

scc = g.stronglyConnectedComponents(maxIter=10)
scc_sizes = scc.groupBy("component").count().orderBy("count", ascending=False)
largest_scc = scc_sizes.first()
largest_scc_nodes = largest_scc['count']
largest_scc_id = largest_scc['component']

# find edge inside largest SCC properly
scc_with_edges = (edges.join(scc.select(col("id").alias("src"), col("component").alias("src_comp")), on="src")
    .join(scc.select(col("id").alias("dst"), col("component").alias("dst_comp")), on="dst")
)

largest_scc_edges = scc_with_edges.filter(
    (col("src_comp") == largest_scc_id) & (col("dst_comp") == largest_scc_id)
).count()

print(f"Largest SCC: {largest_scc_nodes} nodes, {largest_scc_edges} edges")

Largest SCC: 1300 nodes, 39456 edges


In [25]:

# Step 5: Clustering Coefficient & Triangle

triangles = g.triangleCount()

# total triangle -- each counted 3 time
num_triangles = triangles.agg(spark_sum("count")).first()[0] // 3

# correctly calculate total number of connected triplet
degrees = g.degrees.withColumn("degree", col("degree").cast("long"))
total_connected_triplets = degrees.agg(spark_sum(col("degree") * (col("degree") - 1) / 2)).first()[0]

clustering = (triangles.join(g.degrees, "id").withColumn("local_cc", when(col("degree") < 2, 0.0).otherwise(col("count") / (col("degree") * (col("degree") - 1) / 2)))
)
avg_clustering = clustering.agg({"local_cc": "avg"}).first()[0]

fraction_closed = num_triangles / total_connected_triplets

print(f"Average Clustering Coefficient: {avg_clustering:.4f}")
print(f"Number of Triangles: {num_triangles}")
print(f"Fraction of Closed Triangles: {fraction_closed:.5f}")

Average Clustering Coefficient: 0.1387
Number of Triangles: 608389
Fraction of Closed Triangles: 0.03829


In [26]:

# Step 6: Distance Metrics

def distance_metrics(graph, sample_fraction=0.01):
    sample_nodes = [row["id"] for row in graph.vertices.sample(False, sample_fraction, seed=42).collect()]
    if not sample_nodes:
        return None, None

    # use BFS for an estimate of shortest path
    sp_df = g.shortestPaths(landmarks=sample_nodes)

    distances_df = sp_df.select(explode(col("distances")).alias("key", "distance"))
    all_distances = distances_df.select("distance").rdd.map(lambda row: row[0]).collect()

    if not all_distances:
        return None, None

    diameter = max(all_distances) if all_distances else None

    all_distances.sort()
    effective_diameter = all_distances[int(0.9 * len(all_distances))]
    return diameter, effective_diameter

diameter, eff_diameter = distance_metrics(g, sample_fraction=0.01)
print(f"Diameter: {diameter}, Effective Diameter: {eff_diameter}")

Diameter: 9, Effective Diameter: 4


In [27]:

# Step 7: Comparison Report

ground_truth = {
    "Nodes": 7115,
    "Edges": 103689,
    "Largest WCC (nodes)": 7066,
    "Largest WCC (edges)": 103663,
    "Largest SCC (nodes)": 1300,
    "Largest SCC (edges)": 39456,
    "Avg. clustering coefficient": 0.1409,
    "Number of triangles": 608389,
    "Fraction of closed triangles": 0.04564,
    "Diameter": 7,
    "Effective diameter": 3.8
}

results = {
    "Nodes": num_nodes,
    "Edges": num_edges,
    "Largest WCC (nodes)": largest_wcc_nodes,
    "Largest WCC (edges)": largest_wcc_edges,
    "Largest SCC (nodes)": largest_scc_nodes,
    "Largest SCC (edges)": largest_scc_edges,
    "Avg. clustering coefficient": avg_clustering,
    "Number of triangles": num_triangles,
    "Fraction of closed triangles": fraction_closed,
    "Diameter": diameter,
    "Effective diameter": eff_diameter
}

import pandas as pd
df = pd.DataFrame({
    "Metric": ground_truth.keys(),
    "Ground Truth": ground_truth.values(),
    "Your Compute": results.values()
})

print("\n ------------------------------ Comparison Report ---------------------")
print("\n")
print(df)


 ------------------------------ Comparison Report ---------------------


                          Metric  Ground Truth   Your Compute
0                          Nodes    7115.00000    7115.000000
1                          Edges  103689.00000  103689.000000
2            Largest WCC (nodes)    7066.00000    7066.000000
3            Largest WCC (edges)  103663.00000  103663.000000
4            Largest SCC (nodes)    1300.00000    1300.000000
5            Largest SCC (edges)   39456.00000   39456.000000
6    Avg. clustering coefficient       0.14090       0.138652
7            Number of triangles  608389.00000  608389.000000
8   Fraction of closed triangles       0.04564       0.038286
9                       Diameter       7.00000       9.000000
10            Effective diameter       3.80000       4.000000
