# Graph Ranking Algorithms in PySpark

**Author**: Chien-Wei WENG  
**Program**: MSc Data Sciences and Business Analytics (DSBA)  
**Institution**: CentraleSupélec × ESSEC Business School  
**Course**: Scalable Data Algorithms (Fall 2025)  
**Instructor**: Professor Mohamed Ndaoud (ESSEC Business School)

---

## Overview

Implementation of two fundamental graph ranking algorithms using Apache Spark's RDD API for distributed computation on a randomly generated graph with 1,000 nodes and 8,192 edges.

**Algorithms Implemented:**
1. **PageRank** - Global importance scoring
2. **HITS** - Hub and authority scoring

**Key Technical Features:**
- Sparse matrix representation using edge lists
- Teleportation handling to avoid rank sinks
- RDD caching and lineage management
- Iterative power method with 40 iterations
- Normalization to prevent numerical overflow

## 1. PageRank Algorithm

### Overview
PageRank computes node importance based on incoming link structure.

A node is important if it receives links from other important nodes.

### Formula
```
r = (1-β)/n · 1ₙ + β·M·r
```
where:
- `r`: PageRank vector
- `β`: damping factor (0.8)
- `M`: transition matrix
- `n`: number of nodes (1000)

### Implementation Approach
- Initialize: r⁰ = 1/n for all nodes
- Iterate: r^(i+1) = (1-β)/n + β·M·r^i for 40 iterations
- Handle teleportation with probability (1-β)
- No deadends

### Import packages

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz
!tar zxvf spark-3.5.7-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.7-bin-hadoop3"

import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local")
sc = SparkContext(conf = conf)
print("initialization successful!")

import numpy as np
import random as rn

seed_value=0
import os
os.environ['PYTHONHASHSEED']=str(seed_value)

### 1. Data Loading and Preprocessing

#### 1.1 Load graph edges

In [None]:
# Load the graph data
graph_file = "graph.txt"
raw_data = sc.textFile(graph_file)

print("First five lines of graph:")
for line in raw_data.take(5):
  print(line)

First five lines of graph:
1	2
2	3
3	4
4	5
5	6


In [None]:
# Create pairs (source, destinantion)
edges = raw_data.map(lambda line: tuple(line.split()))

print(f"Total edges: {edges.count()}\n")
print("First five edges:")
for edge in edges.take(5):
  print(edge)

Total edges: 8192

First five edges:
('1', '2')
('2', '3')
('3', '4')
('4', '5')
('5', '6')


#### 1.2 Compute out-degrees

In [None]:
# Calculate out_degree for each node
# Creat pairs (source, out_degree)

out_degree = edges.map(lambda edge: (edge[0], 1)) \
          .reduceByKey(lambda x, y: x + y)

print("Sample out-degrees:")
for node, degree in out_degree.take(5):
  print(f"Node {node}: out-degree = {degree}")

Sample out-degrees:
Node 1: out-degree = 6
Node 2: out-degree = 11
Node 3: out-degree = 9
Node 4: out-degree = 6
Node 5: out-degree = 6


In [None]:
# Join edges with out_degrees to get (source, (dest, out_degree))
edges_with_degrees = (edges
            .map(lambda edge: (edge[0], edge[1]))
            .join(out_degree))

#### 1.3 Build transition matrix

In [None]:
# Transition matrix
# Transform to (source, (dest, probability))
transition_max = edges_with_degrees.map(lambda e: (e[0], (e[1][0], 1.0 / e[1][1])))
for source, (dest, prob) in transition_max.take(5):
  print(f"Edge {source} -> {dest}: probability: {prob:.4f}")

Edge 3 -> 4: probability: 0.1111
Edge 3 -> 190: probability: 0.1111
Edge 3 -> 545: probability: 0.1111
Edge 3 -> 562: probability: 0.1111
Edge 3 -> 796: probability: 0.1111


In [None]:
# Initialize r_0 for nodes with incoming
num_edges = edges_with_degrees.count()

r_0 = []
for i in range(num_edges):
  r_0.append((str(i+1), 1/1000)) # (node, rank)

r_0_rdd = sc.parallelize(r_0) # rdd

In [None]:
# Calculate contribution to dest
# Create pairs (source, ((dest, prob), rank))
trainsition_2 = transition_max.join(r_0_rdd)

# Convert to (dest, (source, prob, rank))
trainsition_3 = trainsition_2.map(lambda e: (e[1][0][0], (e[0], e[1][0][1], e[1][1])))

trainsition_3.take(5)

[('6', ('5', 0.16666666666666666, 0.001)),
 ('931', ('5', 0.16666666666666666, 0.001)),
 ('198', ('5', 0.16666666666666666, 0.001)),
 ('394', ('5', 0.16666666666666666, 0.001)),
 ('704', ('5', 0.16666666666666666, 0.001))]

In [None]:
# Calculate contribution to dest
# (dest, (prob x rank))
contribution = trainsition_3.map(lambda e: (e[0], (e[1][1] * e[1][2]))) \
              .reduceByKey(lambda x, y: x + y)

for dest, rank in contribution.take(5):
  print(f"Edge {dest}, rank : {rank:.8f}")

Edge 704, rank : 0.00084702
Edge 453, rank : 0.00086667
Edge 761, rank : 0.00125206
Edge 868, rank : 0.00057864
Edge 298, rank : 0.00180087


### 2. PageRank Implementation

#### 2.1 Initialize rank vector

In [None]:
# Initialize r_1 for all nodes
r_1 = []
for i in range(1000):
  r_1.append((str(i+1), None)) # (node, rank) for all nodes

r_1_rdd = sc.parallelize(r_1) # rdd
r_1_rdd.take(5)

[('1', None), ('2', None), ('3', None), ('4', None), ('5', None)]

#### 2.2 Handle teleportation

In [None]:
# Teleport
beta = 0.8
n = 1000
teleport = (1 - beta) / n

# Update contribution to nodes with incoming
contribution_tele = contribution.mapValues(lambda contrib: teleport + beta * contrib)

print(f"Top 5 nodes before merge")
for node, rank in contribution_tele.takeOrdered(5, key=lambda x: -x[1]):
  print(f"Edge {node}, rank : {rank:.8f}")

Top 5 nodes before merge
Edge 263, rank : 0.00187590
Edge 502, rank : 0.00186635
Edge 126, rank : 0.00184313
Edge 146, rank : 0.00180125
Edge 243, rank : 0.00178063


In [None]:
# Combine all nodes (without incoming + with incoming)
# Left outer join (keeps all nodes from r_1_rdd)
r_i_plus_1 = r_1_rdd.leftOuterJoin(contribution_tele) \
            .mapValues(lambda v: teleport if v[1] is None else v[1])

print(f"Top 5 nodes after merge")
for node, rank in r_i_plus_1.takeOrdered(5, key=lambda x: -x[1]):
  print(f"Edge {node}, rank : {rank:.8f}")

print(f"\nBottom 5 nodes after merge")
for node, rank in r_i_plus_1.takeOrdered(5, key=lambda x: x[1]):
  print(f"Edge {node}, rank : {rank:.8f}")

Top 5 nodes after merge
Edge 263, rank : 0.00187590
Edge 502, rank : 0.00186635
Edge 126, rank : 0.00184313
Edge 146, rank : 0.00180125
Edge 243, rank : 0.00178063

Bottom 5 nodes after merge
Edge 558, rank : 0.00032987
Edge 93, rank : 0.00037143
Edge 424, rank : 0.00037273
Edge 62, rank : 0.00037273
Edge 408, rank : 0.00042154


In [None]:
# Sanity check
print(f"Nodes without incoming: {r_1_rdd.count()}")
print(f"Nodes with incoming: {contribution_tele.count()}")
print(f"Nodes for flow equation: {r_i_plus_1.count()}")

Nodes without incoming: 1000
Nodes with incoming: 1000
Nodes for flow equation: 1000


#### 2.3 Iterative computation (40 iterations)

In [None]:
# Transition matrix: (source, (dest, probability))
transition_max = edges_with_degrees.map(lambda e: (e[0], (e[1][0], 1.0 / e[1][1])))
transition_max = transition_max.cache()
transition_max.count()  # Force evaluation and cache it

# Initialize r_0 for nodes with incoming
num_edges = edges_with_degrees.count()

r_0 = []
for i in range(num_edges):
  r_0.append((str(i+1), 1/1000)) # (node, rank)

r_0_rdd = sc.parallelize(r_0) # rdd

# Initialize r_1 for all nodes
r_1 = []
for i in range(1000):
  r_1.append((str(i+1), None)) # (node, rank) for all nodes

r_1_rdd = sc.parallelize(r_1) # rdd


# Initialize teleport
beta = 0.8
n = 1000
teleport = (1 - beta) / n

In [None]:
# Pipeline for 40 iterations

# Start with r_0
current_rank = r_0_rdd

# Iterate 40 times
for iteration in range(40):
  # 1. Join transition_matrix with current_rank: (source, ((dest, prob), rank))
  transition_2 = transition_max.join(current_rank)
  # 2. Convert to (dest, (source, prob, rank))
  transition_3 = transition_2.map(lambda e: (e[1][0][0], (e[0], e[1][0][1], e[1][1])))
  # 3. Compute contribution (M · r): (dest, (prob x rank))
  contribution = transition_3.map(lambda e: (e[0], (e[1][1] * e[1][2]))) \
              .reduceByKey(lambda x, y: x + y)
  # 4. Add teleportation: teleport + beta * contribution
  contribution_tele = contribution.mapValues(lambda contrib: teleport + beta * contrib)
  # 5. Handle nodes without incoming edges (leftOuterJoin)
  r_i_plus_1 = r_1_rdd.leftOuterJoin(contribution_tele) \
            .mapValues(lambda v: teleport if v[1] is None else v[1])

  # Cache new result and unpersist old
  r_i_plus_1.cache()
  r_i_plus_1.count()  # Force evaluation
  current_rank.unpersist()

  # 6. Update current_rank for next iteration
  current_rank = r_i_plus_1  # Result becomes input for next iteration

  if iteration % 10 == 0:
    print(f"Iteration {iteration} complete")

Iteration 0 complete
Iteration 10 complete
Iteration 20 complete
Iteration 30 complete


### 3. PageRank Results

#### 3.1 Top 5 ranked nodes

In [None]:
print(f"Top 5 nodes after 40 iterations")
for node, rank in current_rank.takeOrdered(5, key=lambda x: -x[1]):
  print(f"Edge {node}, rank : {rank:.8f}")

Top 5 nodes after 40 iterations
Edge 263, rank : 0.00201742
Edge 537, rank : 0.00194384
Edge 965, rank : 0.00189436
Edge 243, rank : 0.00184955
Edge 187, rank : 0.00183004


#### 3.2 Bottom 5 ranked nodes

In [None]:
print(f"Bottom 5 nodes after 40 iterations")
for node, rank in current_rank.takeOrdered(5, key=lambda x: x[1]):
  print(f"Edge {node}, rank : {rank:.8f}")

Bottom 5 nodes after 40 iterations
Edge 558, rank : 0.00032955
Edge 93, rank : 0.00035143
Edge 424, rank : 0.00035484
Edge 62, rank : 0.00036131
Edge 408, rank : 0.00038759


## 2. HITS Algorithm

### Overview
HITS distinguishes between two complementary types of important pages:
- **Hubs**: Pages that link to many authoritative pages
- **Authorities**: Pages that receive links from many good hubs

### Formulas
```
h = λL·a   (hub score from authority of outlinks)
a = μL^T·h (authority score from hub quality of inlinks)
```
where:
- `h`: hubbiness vector
- `a`: authority vector
- `L`: link matrix (L_ij = 1 if i→j)
- `λ, μ`: scaling factors (both = 1)

### Implementation Approach
- Initialize: h⁰ = all 1's
- Iterate 40 times:
  1. Compute a = μL^T·h, normalize to max=1
  2. Compute h = λL·a, normalize to max=1
- Mutual reinforcement: hubs ↔ authorities

### Link Matrix Construction

In [None]:
# Reuse the same edges from PageRank
# L has structure: (source, dest) where Lij = 1 if i→j

# Load the graph data
graph_file = "graph.txt"
raw_data = sc.textFile(graph_file) # rdd

# Create pairs (source, destinantion)
L = raw_data.map(lambda line: tuple(line.split()))

print(f"Total links: {L.count()}\n")
print(f"First five links: \n{L.take(5)}")

Total links: 8192

First five links: 
[('1', '2'), ('2', '3'), ('3', '4'), ('4', '5'), ('5', '6')]


### Initialize Vectors

In [None]:
n = 1000
Lambda = 1 # Scaling factor for hubs
mu = 1 # Scaling factor for authorities

# Initialize h with all 1's (hubbiness vector)
h_vector = []
for i in range(n):
  h_vector.append((i, 1)) # (node, initial_hub_score)

h = sc.parallelize(h_vector) # rdd

- Compute authority vector: $a = µL^Th$ (column $j$ as row × vector):
- Result: sum of $h[i]$ for all $i$ that point to $j$

In [None]:
# Join L with h on source
L_with_h = L.map(lambda edge: (int(edge[0]), int(edge[1]))) \
        .join(h) # (source, (dest, h_value))

# Map to (dest, h_value) - contribution to destination's authority
contributions_to_dest = L_with_h.map(lambda x: (x[1][0], x[1][1])) # (dest, h_value)

# Sum contribution per destination
a_unnormalized = contributions_to_dest.reduceByKey(lambda x, y: x + y)

# Scale by mu and normalize
max_a = a_unnormalized.map(lambda x: x[1]).max()
a = a_unnormalized.mapValues(lambda v: mu * v / max_a)

- Compute hubbiness vector: $h = λLa$ (row $i$ × column vector):
- Result: sum of $a[j]$ for all $j$ that $i$ points to

In [None]:
# Join L with an on destination
L_with_a = L.map(lambda edge: (int(edge[1]), int(edge[0]))) \
        .join(a) # (dest, (source, a_value))

# Map to (source, a_value) - contribution to source's hub
contributions_to_source = L_with_a.map(lambda x: (x[1][0], x[1][1])) # (source, a_value)

# Sum contribution per source
h_unnormalized = contributions_to_source.reduceByKey(lambda x, y: x + y)

# Scale by Lambda and normalize
max_h = h_unnormalized.map(lambda x: x[1]).max()
h = h_unnormalized.mapValues(lambda v: Lambda * v / max_h)

### Iterative Computation (40 iterations)

In [None]:
# Pipeline for 40 iterations
L = L.cache()
L.count()

# Start with all 1's
current_h = h
current_a = None

# Iterate 40 times
for iteration in range(40):
  # Step 1: Compute authority a = μL^T·h
  # L^T·h means: for each dest, sum h values of sources pointing to it
  L_with_h = L.map(lambda edge: (int(edge[0]), int(edge[1]))) \
        .join(current_h) # (source, (dest, h_value))

  contributions_to_dest = L_with_h.map(lambda x: (x[1][0], x[1][1])) # (dest, h_value)
  a_unnormalized = contributions_to_dest.reduceByKey(lambda x, y: x + y)

  # Normalize: max value = 1
  max_a = a_unnormalized.map(lambda x: x[1]).max()
  current_a = a_unnormalized.mapValues(lambda v: mu * v / max_a)

  # Cache new result
  current_a.cache()
  current_a.count()


  # Step 2: Compute hubbiness h = λL·a
  # L·a means: for each source, sum a values of dests it points to
  L_with_a = L.map(lambda edge: (int(edge[1]), int(edge[0]))) \
          .join(current_a)

  contributions_to_source = L_with_a.map(lambda x: (x[1][0], x[1][1])) # (source, a_value)
  h_unnormalized = contributions_to_source.reduceByKey(lambda x, y: x + y)

  # Normalize: max value = 1
  max_h = h_unnormalized.map(lambda x: x[1]).max()
  new_h = h_unnormalized.mapValues(lambda v: Lambda * v / max_h)
  new_h.cache()
  new_h.count()


  # Unpersist old values
  if current_h:
    current_h.unpersist()
  current_h = new_h

  if iteration % 10 == 0:
    print(f"Iteration {iteration} complete")

Iteration 0 complete
Iteration 10 complete
Iteration 20 complete
Iteration 30 complete


### HITS Results

- Top 5 hubs
- Bottom 5 hubs

In [None]:
print(f"Top 5 nodes with the highest hubbiness score")
for node, h in current_h.takeOrdered(5, key=lambda x: -x[1]):
  print(f"Edge {node}, hubbiness score : {h:.8f}")

print(f"Bottom 5 nodes with the lowest hubbiness score")
for node, h in current_h.takeOrdered(5, key=lambda x: x[1]):
  print(f"Edge {node}, hubbiness score : {h:.8f}")

Top 5 nodes with the highest hubbiness score
Edge 840, hubbiness score : 1.00000000
Edge 155, hubbiness score : 0.87581037
Edge 234, hubbiness score : 0.83653755
Edge 389, hubbiness score : 0.79747129
Edge 472, hubbiness score : 0.79615526
Bottom 5 nodes with the lowest hubbiness score
Edge 23, hubbiness score : 0.03903892
Edge 835, hubbiness score : 0.05302275
Edge 141, hubbiness score : 0.05935309
Edge 539, hubbiness score : 0.06309816
Edge 889, hubbiness score : 0.07042761


- Top 5 authorities
- Bottom 5 authorities

In [None]:
print(f"Top 5 nodes with the highest authority score")
for node, a in current_a.takeOrdered(5, key=lambda x: -x[1]):
  print(f"Edge {node}, hubbiness score : {a:.8f}")

print(f"Bottom 5 nodes with the lowest authority score")
for node, a in current_a.takeOrdered(5, key=lambda x: x[1]):
  print(f"Edge {node}, hubbiness score : {a:.8f}")

Top 5 nodes with the highest authority score
Edge 893, hubbiness score : 1.00000000
Edge 16, hubbiness score : 0.96776625
Edge 799, hubbiness score : 0.95135153
Edge 146, hubbiness score : 0.94164366
Edge 473, hubbiness score : 0.90610856
Bottom 5 nodes with the lowest authority score
Edge 19, hubbiness score : 0.05837246
Edge 135, hubbiness score : 0.06686172
Edge 462, hubbiness score : 0.07511939
Edge 24, hubbiness score : 0.08360036
Edge 910, hubbiness score : 0.08543487


## Comparison: PageRank vs HITS

### Algorithm Differences

| Aspect | PageRank | HITS |
|--------|----------|------|
| **Output** | Single importance score | Hub + Authority scores |
| **Top Node** | 263 (0.00201) | Hub: 840, Authority: 893 |
| **Concept** | Global importance via random walk | Mutual reinforcement of roles |
| **Use Case** | Overall page importance | Distinguishing sources vs destinations |
| **Computation** | 1 vector update per iteration | 2 vector updates per iteration |
| **Performance** | Faster | ~2× slower |

### Key Insight

Different nodes rank highly in different algorithms, showing they measure complementary aspects:
- **PageRank**: Global importance (who should I visit?)
- **HITS Hubs**: Quality as connector (who knows good resources?)
- **HITS Authorities**: Quality as destination (who is the expert?)

## Key Learning Outcomes

### Distributed Systems Concepts
- MapReduce paradigm (map, reduce, join operations)
- Trade-offs between materialization and recomputation
- Data locality and partitioning importance

### Algorithmic Insights
- Why teleportation is crucial for PageRank convergence
- How sparse matrix representations save memory
- Mutually reinforcing relationships in HITS
- Normalization prevents numerical overflow

### Performance Optimization
- Strategic use of `.cache()` and `.persist()`
- RDD lineage management to avoid deep dependencies
- Avoiding shuffle operations where possible

## Potential Extensions

1. **Personalized PageRank**: Bias teleportation toward specific nodes
2. **Convergence Detection**: Stop when changes fall below threshold
3. **Visualization**: Graph with node sizes proportional to scores
4. **Scalability Testing**: Benchmark on 10K, 100K, 1M node graphs
5. **Topic-Sensitive HITS**: Weight edges by content similarity