<a href="https://colab.research.google.com/github/txjxs/Big-data-HW-NBs/blob/main/HW1_Q1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Question 1

In [None]:
from pyspark import SparkConf, SparkContext


In [None]:
# Initialize Spark Context
conf = SparkConf().set("spark.executor.memory", "8g")
sc = SparkContext(conf=conf)


In [None]:
def parse_line(line):
    """Parses a line from the input file and returns (user, list_of_friends)"""
    parts = line.split("\t")
    user = int(parts[0])
    friends = list(map(int, parts[1].split(","))) if len(parts) > 1 and parts[1] else []
    return user, friends


In [None]:
def generate_mutual_friend_pairs(user, friends):
    """Generates pairs (friend, mutual_friend) for each user."""
    mutual_friend_pairs = []
    for i in range(len(friends)):
        for j in range(i + 1, len(friends)):
            pair = tuple(sorted([friends[i], friends[j]]))
            mutual_friend_pairs.append((pair, 1))
    return mutual_friend_pairs


In [None]:
def recommend_friends(user, friends, mutual_friends_count):
    """Generates friend recommendations for a user based on mutual friends."""
    existing_friends = set(friends)
    recommendations = [(other_user, count) for (other_user, count) in mutual_friends_count.items() if other_user not in existing_friends]
    recommendations.sort(key=lambda x: (-x[1], x[0]))  # Sort by descending count, then ascending ID
    return user, [rec[0] for rec in recommendations[:10]]



In [None]:
# Load and parse the data
lines = sc.textFile("/content/soc-LiveJournal1Adj.txt")  # Replace with actual file path
user_friends = lines.map(parse_line)


In [None]:
user_friends.take(1)

[(0,
  [1,
   2,
   3,
   4,
   5,
   6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   38,
   39,
   40,
   41,
   42,
   43,
   44,
   45,
   46,
   47,
   48,
   49,
   50,
   51,
   52,
   53,
   54,
   55,
   56,
   57,
   58,
   59,
   60,
   61,
   62,
   63,
   64,
   65,
   66,
   67,
   68,
   69,
   70,
   71,
   72,
   73,
   74,
   75,
   76,
   77,
   78,
   79,
   80,
   81,
   82,
   83,
   84,
   85,
   86,
   87,
   88,
   89,
   90,
   91,
   92,
   93,
   94])]

In [None]:
# Generate mutual friend counts
mutual_friend_pairs = user_friends.flatMap(lambda x: generate_mutual_friend_pairs(x[0], x[1]))
mutual_friend_counts = mutual_friend_pairs.reduceByKey(lambda x, y: x + y)


In [None]:
# Restructure the data for recommendations
mutual_friends_by_user = mutual_friend_counts.flatMap(lambda x: [(x[0][0], (x[0][1], x[1])), (x[0][1], (x[0][0], x[1]))])
mutual_friends_grouped = mutual_friends_by_user.groupByKey().mapValues(dict)


In [None]:
# Generate recommendations
recommendations = user_friends.join(mutual_friends_grouped).map(lambda x: recommend_friends(x[0], x[1][0], x[1][1]))


In [None]:
# test
user_id = [11]
test_result = recommendations.filter(lambda x: x[0] in user_id).collect()
print(test_result)

[(11, [27552, 7785, 27573, 27574, 27589, 27590, 27600, 27617, 27620, 27667])]


In [None]:
# Collect recommendations for specified user IDs
user_ids = [1376, 1377, 1210, 9018, 9033, 9040, 9999, 9910, 9902, 9993]
results = recommendations.filter(lambda x: x[0] in user_ids).collect()


In [None]:
# Print results
for user, recs in results:
    print(f"{user}\t{','.join(map(str, recs))}")

sc.stop()


1376	1382,6487,1356,1366,2409,4498,5307,8737,12700,16486
9040	9033,9039,9025,9027,9031,9032,9034,9036,503,9029
1377	1349,1382,1387,1390,1405,1410,1428,33772,1352,1354
9033	9039,9040,9025,9026,9027,9031,9032,9034,9036,503
9993	9991,13134,13478,13877,34299,34485,34642,37941
9018	9016,9017,317,9023
9902	9906,18845,1797,9891,9894,18626,24136,236,319,351
1210	1158,1204,1233,1169,1223,1159,1167,1175,1182,1183
9910	351,622,2554,7651,9920,22338,30169,30403,45111,47577
9999	36764,44132,10058,44088,36765,36909,10055,44068,44076,10000


## Writeup

This recommendation system follows a MapReduce paradigm to process large-scale data efficiently. The pipeline begins by parsing the input data into a user-friend adjacency list. We then use a flatMap transformation to generate mutual friend pairs, where each pair represents two users who share a mutual friend. The reduceByKey operation then aggregates these pairs, counting how many mutual friends each pair shares. This produces a list of second-degree connections ranked by their mutual friend count. Next, we restructure the data to associate each user with potential recommendations and filter out existing friends. Finally, we sort the recommendations based on the number of mutual friends (descending order), and in case of ties, by user ID (ascending order). The system then returns the top 10 recommendations for each user. This approach leverages parallel processing in Spark, ensuring efficient computation at scale.
