<a href="https://colab.research.google.com/github/sridhartroy/AIML/blob/main/MapReduce_FriendRecommender.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark==3.5.6

Collecting pyspark==3.5.6
  Downloading pyspark-3.5.6.tar.gz (317.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.4/317.4 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.6-py2.py3-none-any.whl size=317895798 sha256=bbb9c3c40d2e4e5be92b9d612456cd20d624c387cd3dd8bcdef668f6f2c8698a
  Stored in directory: /root/.cache/pip/wheels/64/62/f3/ec15656ea4ada0523cae62a1827fe7beb55d3c8c87174aad4a
Successfully built pyspark
Installing collected packages: pyspark
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.1
    Uninstalling pyspark-3.5.1:
      Successfully uninstalled pyspark-3.5.1
Successfully installed pyspark-3.5.6


In [2]:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder \
    .appName("MapReduce-FriendRecommender") \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
!wget https://an-ml.s3.us-west-1.amazonaws.com/soc-LiveJournal1Adj.txt

--2025-10-27 22:03:35--  https://an-ml.s3.us-west-1.amazonaws.com/soc-LiveJournal1Adj.txt
Resolving an-ml.s3.us-west-1.amazonaws.com (an-ml.s3.us-west-1.amazonaws.com)... 52.219.194.98, 52.219.216.26, 52.219.117.49, ...
Connecting to an-ml.s3.us-west-1.amazonaws.com (an-ml.s3.us-west-1.amazonaws.com)|52.219.194.98|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4156181 (4.0M) [text/plain]
Saving to: ‘soc-LiveJournal1Adj.txt’


2025-10-27 22:03:36 (9.14 MB/s) - ‘soc-LiveJournal1Adj.txt’ saved [4156181/4156181]



In [None]:
friends = sc.textFile("soc-LiveJournal1Adj.txt")

In [4]:
# Tiny example

lines = sc.parallelize([
    "1\t2,3,4,5,6",
    "2\t1,3,5,6",
    "3\t1,2,4,6",
    "4\t1,3,5",
    "5\t,1,2,4,6",
    "6\t1,2,3,5"
  ])


In [5]:
lines.take(3)

['1\t2,3,4,5,6', '2\t1,3,5,6', '3\t1,2,4,6']

In [6]:
# parse the file

def parse_line(line):
    parts = line.split("\t")
    #print(len(parts))
    user = parts[0]
    friends = parts[1].split(",") if len(parts) > 1 and parts[1] else []
    friends = [f for f in friends if f and f != user]  # drop empties/self
    return (user, friends)

In [7]:
print("STEP 0 - Original List:")
print(lines.collect())

adj = lines.map(parse_line).cache()
print("STEP 1 — parsed adjacency: WHO ARE MY FRIENDS?")
for x in adj.collect(): print(x)

STEP 0 - Original List:
['1\t2,3,4,5,6', '2\t1,3,5,6', '3\t1,2,4,6', '4\t1,3,5', '5\t,1,2,4,6', '6\t1,2,3,5']
STEP 1 — parsed adjacency: WHO ARE MY FRIENDS?
('1', ['2', '3', '4', '5', '6'])
('2', ['1', '3', '5', '6'])
('3', ['1', '2', '4', '6'])
('4', ['1', '3', '5'])
('5', ['1', '2', '4', '6'])
('6', ['1', '2', '3', '5'])


In [8]:
inv = adj.flatMap(lambda uf: [(f, uf[0]) for f in uf[1]])
inv_grouped = inv.groupByKey().mapValues(lambda vals: sorted(set(vals)))
print(inv.take(30))
print(inv.groupByKey().take(30))
print(inv_grouped.take(30))

#print(inv_grouped.sortByKey(ascending=True).take(20))

inv_grouped = inv_grouped.sortByKey(ascending=True).cache()

print(inv_grouped.take(30))

print("\nSTEP 2 — inverted index: WHO SHARES THIS FRIEND?")
for f, users in inv_grouped.collect(): print(f, ":", users)


[('2', '1'), ('3', '1'), ('4', '1'), ('5', '1'), ('6', '1'), ('1', '2'), ('3', '2'), ('5', '2'), ('6', '2'), ('1', '3'), ('2', '3'), ('4', '3'), ('6', '3'), ('1', '4'), ('3', '4'), ('5', '4'), ('1', '5'), ('2', '5'), ('4', '5'), ('6', '5'), ('1', '6'), ('2', '6'), ('3', '6'), ('5', '6')]
[('3', <pyspark.resultiterable.ResultIterable object at 0x7e4ddd6c11f0>), ('4', <pyspark.resultiterable.ResultIterable object at 0x7e4ddd6c3770>), ('5', <pyspark.resultiterable.ResultIterable object at 0x7e4ddcbba480>), ('6', <pyspark.resultiterable.ResultIterable object at 0x7e4ddcdd5190>), ('2', <pyspark.resultiterable.ResultIterable object at 0x7e4ddcbb8e00>), ('1', <pyspark.resultiterable.ResultIterable object at 0x7e4ddcbb92b0>)]
[('3', ['1', '2', '4', '6']), ('4', ['1', '3', '5']), ('5', ['1', '2', '4', '6']), ('6', ['1', '2', '3', '5']), ('2', ['1', '3', '5', '6']), ('1', ['2', '3', '4', '5', '6'])]
[('1', ['2', '3', '4', '5', '6']), ('2', ['1', '3', '5', '6']), ('3', ['1', '2', '4', '6']), ('4'

In [9]:
from itertools import combinations

In [10]:
# Emit all unordered user pairs per friend (+1 each)

pairs_by_dim = inv_grouped.flatMap(
    lambda fv: [((a, b), 1) for a, b in combinations(fv[1], 2)]
).cache()

print("\nSTEP 3 — pair contributions (+1 per shared friend):")
for p in pairs_by_dim.collect(): print(p)


STEP 3 — pair contributions (+1 per shared friend):
(('2', '3'), 1)
(('2', '4'), 1)
(('2', '5'), 1)
(('2', '6'), 1)
(('3', '4'), 1)
(('3', '5'), 1)
(('3', '6'), 1)
(('4', '5'), 1)
(('4', '6'), 1)
(('5', '6'), 1)
(('1', '3'), 1)
(('1', '5'), 1)
(('1', '6'), 1)
(('3', '5'), 1)
(('3', '6'), 1)
(('5', '6'), 1)
(('1', '2'), 1)
(('1', '4'), 1)
(('1', '6'), 1)
(('2', '4'), 1)
(('2', '6'), 1)
(('4', '6'), 1)
(('1', '3'), 1)
(('1', '5'), 1)
(('3', '5'), 1)
(('1', '2'), 1)
(('1', '4'), 1)
(('1', '6'), 1)
(('2', '4'), 1)
(('2', '6'), 1)
(('4', '6'), 1)
(('1', '2'), 1)
(('1', '3'), 1)
(('1', '5'), 1)
(('2', '3'), 1)
(('2', '5'), 1)
(('3', '5'), 1)


In [11]:
# 4) Sum contributions (this is the sparse dot-product)

mutual_counts = pairs_by_dim.reduceByKey(lambda a, b: a + b).sortByKey(ascending=True).cache()
print(mutual_counts.take(20))

print("\nSTEP 4 — mutual friend counts:")
for pair, cnt in mutual_counts.collect(): print(pair, ":", cnt)

[(('1', '2'), 3), (('1', '3'), 3), (('1', '4'), 2), (('1', '5'), 3), (('1', '6'), 3), (('2', '3'), 2), (('2', '4'), 3), (('2', '5'), 2), (('2', '6'), 3), (('3', '4'), 1), (('3', '5'), 4), (('3', '6'), 2), (('4', '5'), 1), (('4', '6'), 3), (('5', '6'), 2)]

STEP 4 — mutual friend counts:
('1', '2') : 3
('1', '3') : 3
('1', '4') : 2
('1', '5') : 3
('1', '6') : 3
('2', '3') : 2
('2', '4') : 3
('2', '5') : 2
('2', '6') : 3
('3', '4') : 1
('3', '5') : 4
('3', '6') : 2
('4', '5') : 1
('4', '6') : 3
('5', '6') : 2


In [12]:
# Existing friends and remove

#print first the adjacency list
print("Original Adjacency : ")
print(adj.take(20))

edges_unordered = (adj
    .flatMap(lambda uf: [tuple(sorted((uf[0], f))) for f in uf[1]])
    .distinct()
    .collect()
)

edges_unordered.sort()

print("\nExisting Friends :")
print(edges_unordered)

b_edges = sc.broadcast(set(edges_unordered))

# print(b_edges.value)

print("\nMutual Counts :")
print(mutual_counts.take(20))

nonfriends = mutual_counts.filter(lambda kv: kv[0] not in b_edges.value).cache()

print("\nSTEP 5 — non-friend candidate pairs (connect_score = mutuals):")
for p in nonfriends.collect(): print(p)

Original Adjacency : 
[('1', ['2', '3', '4', '5', '6']), ('2', ['1', '3', '5', '6']), ('3', ['1', '2', '4', '6']), ('4', ['1', '3', '5']), ('5', ['1', '2', '4', '6']), ('6', ['1', '2', '3', '5'])]

Existing Friends :
[('1', '2'), ('1', '3'), ('1', '4'), ('1', '5'), ('1', '6'), ('2', '3'), ('2', '5'), ('2', '6'), ('3', '4'), ('3', '6'), ('4', '5'), ('5', '6')]

Mutual Counts :
[(('1', '2'), 3), (('1', '3'), 3), (('1', '4'), 2), (('1', '5'), 3), (('1', '6'), 3), (('2', '3'), 2), (('2', '4'), 3), (('2', '5'), 2), (('2', '6'), 3), (('3', '4'), 1), (('3', '5'), 4), (('3', '6'), 2), (('4', '5'), 1), (('4', '6'), 3), (('5', '6'), 2)]

STEP 5 — non-friend candidate pairs (connect_score = mutuals):
(('2', '4'), 3)
(('3', '5'), 4)
(('4', '6'), 3)


In [13]:
# 6) Fan-out to directed candidates

directed = nonfriends.flatMap(
    lambda kv: [
        (kv[0][0], (kv[0][1], kv[1])),  # A -> (B, score)
        (kv[0][1], (kv[0][0], kv[1]))   # B -> (A, score)
    ]
).cache()

print("\nSTEP 6 — directed candidates (user -> (friend_to_be_recommended, connect_score)):")
for d in directed.collect(): print(d)



STEP 6 — directed candidates (user -> (friend_to_be_recommended, connect_score)):
('2', ('4', 3))
('4', ('2', 3))
('3', ('5', 4))
('5', ('3', 4))
('4', ('6', 3))
('6', ('4', 3))


In [14]:
# 7) Group per user and take Top-10 (score desc, tie-break by ID asc)

def topk(recs, k=1):
    # recs is iterable of (candidate, score); sort by (-score, candidate)
    return [v for v, c in sorted(recs, key=lambda x: (-x[1], x[0]))[:k]]

top10 = directed.groupByKey().mapValues(lambda recs: topk(recs, 10)).cache()

print("\nSTEP 7 — top recommendations per user:")
for u, lst in top10.collect(): print(u, ":", lst)


#print(directed.groupByKey().take(20))
print(type(top10))


STEP 7 — top recommendations per user:
4 : ['2', '6']
3 : ['5']
5 : ['3']
6 : ['4']
2 : ['4']
<class 'pyspark.rdd.PipelinedRDD'>


In [15]:
# Print All

all_users = adj.map(lambda uf: uf[0]).distinct()

print("\nSample from all_users:")
print(all_users.sortBy(lambda x:x).take(10)) # Print first 10 elements


print("\nSample from top10:")
print(top10.sortByKey(ascending=True).take(10)) # Print first 10 elements

# Collect top10 into a dictionary and broadcast it
top10_dict = dict(top10.collect())
broadcast_top10 = sc.broadcast(top10_dict)

# Map over all_users to get recommendations using the broadcasted dictionary
final = (all_users
         .map(lambda user: (user, broadcast_top10.value.get(user, [])))
         .map(lambda uv: f"{uv[0]}\t{','.join(uv[1])}").sortBy(lambda x:x))

print("\nSTEP 8 — final lines (User\\tFriend_Rec1,Friend_Rec2,...):")
for line in final.collect(): print(line)

print(type(final))


Sample from all_users:
['1', '2', '3', '4', '5', '6']

Sample from top10:
[('2', ['4']), ('3', ['5']), ('4', ['2', '6']), ('5', ['3']), ('6', ['4'])]

STEP 8 — final lines (User\tFriend_Rec1,Friend_Rec2,...):
1	
2	4
3	5
4	2,6
5	3
6	4
<class 'pyspark.rdd.PipelinedRDD'>


In [16]:
for k in range(1,11):
  print("Sample ", k, ": TOP 10 Recommendations for user selected based on count of mutual friends : ")
  for line in final.takeSample(False,5, seed=k): print(line)
  print("\n")

Sample  1 : TOP 10 Recommendations for user selected based on count of mutual friends : 
3	5
4	2,6
6	4
1	
5	3


Sample  2 : TOP 10 Recommendations for user selected based on count of mutual friends : 
3	5
4	2,6
2	4
5	3
6	4


Sample  3 : TOP 10 Recommendations for user selected based on count of mutual friends : 
1	
3	5
4	2,6
6	4
5	3


Sample  4 : TOP 10 Recommendations for user selected based on count of mutual friends : 
4	2,6
6	4
5	3
1	
3	5


Sample  5 : TOP 10 Recommendations for user selected based on count of mutual friends : 
2	4
1	
4	2,6
6	4
3	5


Sample  6 : TOP 10 Recommendations for user selected based on count of mutual friends : 
3	5
6	4
2	4
4	2,6
1	


Sample  7 : TOP 10 Recommendations for user selected based on count of mutual friends : 
5	3
1	
6	4
4	2,6
2	4


Sample  8 : TOP 10 Recommendations for user selected based on count of mutual friends : 
6	4
5	3
1	
4	2,6
3	5


Sample  9 : TOP 10 Recommendations for user selected based on count of mutual friends : 
6	4
1	
2	4
3	5

In [17]:
for k in range(1,10):
  sample_users = final.takeSample(False, 5, seed=k)
  print(sample_users)

['3\t5', '4\t2,6', '6\t4', '1\t', '5\t3']
['3\t5', '4\t2,6', '2\t4', '5\t3', '6\t4']
['1\t', '3\t5', '4\t2,6', '6\t4', '5\t3']
['4\t2,6', '6\t4', '5\t3', '1\t', '3\t5']
['2\t4', '1\t', '4\t2,6', '6\t4', '3\t5']
['3\t5', '6\t4', '2\t4', '4\t2,6', '1\t']
['5\t3', '1\t', '6\t4', '4\t2,6', '2\t4']
['6\t4', '5\t3', '1\t', '4\t2,6', '3\t5']
['6\t4', '1\t', '2\t4', '3\t5', '5\t3']
