# **Bài tập lớn - Homework 1 - CS246**
# **Bài 1: Spark**
**Lớp học phần: DAT712_222_8_L14**

**Thành viên nhóm:**
* **Trịnh Nguyễn Nhật An - MSSV: 050608200222**
* **Nguyễn Hữu Viết Ngọc - MSSV: 050608200489**
* **Võ Ngọc Khánh Vy - MSSV: 050608200791**



**Cài đặt thư viện cần thiết**

In [None]:
# Setup
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
from itertools import combinations

In [None]:
# Initialize Spark
conf = SparkConf().set("spark.ui.port", "4040")
sc = SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()
spark

**Định nghĩa một số hàm**

In [None]:
def line_to_user_friends(line):
    """Convert a line of text to a tuple (user, [friends]).
    With user is an integer and friends is a list of integers.
    If user has no friends, friends is an empty list.
    If line has format error, return an empty tuple.

    Args:
        line (str): A line of text.
    
    Returns:
        tuple: (user, [friends])
    """
    
    splitted_line = line.split("\t")
    if splitted_line[0] == "":
        return ()
    else:
        user = int(splitted_line[0])
        if splitted_line[1] == "":
            friends = []
        else:
            friends = list(map(int, splitted_line[1].split(",")))
        
        return (user, friends)

def user_friends_to_connections(user_friends):
    """Convert a tuple (user, [friends]) to a list of tuples (key, value).
    With key is a tuple (user, friend) or (friend, user) 
    and value is 0 if the connection is already a friend or 1 if the connection is has a mutual friend.

    Args:
        user_friends (tuple): (user, [friends])
    
    Returns:
        list: [(key, value), ...]
    
    Example:
        >>> user_friends_to_connections((1, [2, 3]))
        [((1, 2), 0), ((1, 3), 0), ((2, 3), 1)]
    """

    user = user_friends[0]
    friends = user_friends[1]
    connections = []
    for friend in friends:
        key = (user, friend) if user < friend else (friend, user)
        connections.append((key, 0))
    
    for friend1, friend2 in combinations(friends, 2):
        key = (friend1, friend2) if friend1 < friend2 else (friend2, friend1)
        connections.append((key, 1))

    return connections

def mutual_friends_to_recommendations(pair_count):
    """Convert a tuple (pair, n_mutual_friends) to a list of tuples (user, (friend, n_mutual_friends)).
    With pair is a tuple (user, friend) or (friend, user) and n_mutual_friends is an integer.

    Args:
        pair_count (tuple): (pair, n_mutual_friends)
    
    Returns:
        list: [(user, (friend, n_mutual_friends)), ...]
    """
    
    pair = pair_count[0]
    user_1 = pair[0]
    user_2 = pair[1]
    n_mutual_friends = pair_count[1]

    recommend_1 = (user_1, (user_2, n_mutual_friends))
    recommend_2 = (user_2, (user_1, n_mutual_friends))

    return [recommend_1, recommend_2]

def recommendations_to_top_10(recommendations):
    """Convert a list of tuples (user, (friend, n_mutual_friends)) to a list of tuples (user, [friends]).
    With user is an integer and friends is a list of integers.
    The list of friends is sorted by the number of mutual friends in descending order.
    If two users have the same number of mutual friends, sort by user id in ascending order.
    If there are more than 10 friends, only return the top 10 friends.
    
    Args:
        recommendations (list): [(user, (friend, n_mutual_friends)), ...]
    
    Returns:
        list: [(user, [friends]), ...]
    
    Example:
        >>> recommendations_to_top_10([(1, (2, 5)), (1, (3, 1)), (1, (4, 3)), (1, (5, 2)), (1, (6, 7))])
        [(1, [6, 2, 4, 5, 3])]
    """
    
    recommendations = sorted(recommendations, key=lambda x: (-x[1], x[0]))
    result = map(lambda x: x[0], recommendations[:10])
    return list(result)

**Chạy thuật toán gợi ý**

\* *Thời gian chạy sẽ không cố định, sẽ có những lúc thời gian chạy 3 phút, 7 phút hay 10 phút. Nếu thời gian chạy quá lâu thì hãy restart runtime của Colab và chạy lại từ đầu.*

In [None]:
# Read file
lines = sc.textFile("soc-LiveJournal1Adj.txt")

# Map each line to a tuple (user, [friends])
user_friends = lines.map(line_to_user_friends)

# Map each tuple (user, [friends]) to a list of tuples (key, value)
connections = user_friends.flatMap(user_friends_to_connections)

# Reduce by key to get the number of mutual friends
mutual_friends = connections.groupByKey().filter(lambda x: 0 not in x[1]).flatMap(lambda x: [(x[0], value) for value in x[1]])
mutual_friends_count = mutual_friends.reduceByKey(lambda x, y: x + y)

# Recommend friends
recommendations = mutual_friends_count.flatMap(mutual_friends_to_recommendations).groupByKey()
recommendations = recommendations.map(lambda x: (x[0], recommendations_to_top_10(x[1]))).sortByKey()

In [None]:
# Extract all the users that don't have any friends
users = user_friends.filter(lambda x: x[1] == [])

# Join the users that don't have any friends with the recommendations
recommendations = recommendations.union(users.map(lambda x: (x[0], [])))

# Rank the users by user id in ascending order
recommendations = recommendations.sortByKey()

**Gợi ý bạn bè**

In [None]:
def recommend_friend(user_id):
    """Return a list of recommended friends for a user.

    Args:
        user_id (int): The user id.
    
    Returns:
        list: [friends]
    """

    return recommendations.filter(lambda x: x[0] == user_id).collect()[0][1]

# Check the result
print(recommend_friend(11))

[27552, 7785, 27573, 27574, 27589, 27590, 27600, 27617, 27620, 27667]


**Xuất kết quả gợi ý của các user sau: 924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993**

In [None]:
result = recommendations.filter(lambda x: x[0] in [924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993]).collect()
for user_id, friends in result:
    print(f"User {user_id}: {friends}")

User 924: [439, 2409, 6995, 11860, 15416, 43748, 45881]
User 8941: [8943, 8944, 8940]
User 8942: [8939, 8940, 8943, 8944]
User 9019: [9022, 317, 9023]
User 9020: [9021, 9016, 9017, 9022, 317, 9023]
User 9021: [9020, 9016, 9017, 9022, 317, 9023]
User 9022: [9019, 9020, 9021, 317, 9016, 9017, 9023]
User 9990: [13134, 13478, 13877, 34299, 34485, 34642, 37941]
User 9992: [9987, 9989, 35667, 9991]
User 9993: [9991, 13134, 13478, 13877, 34299, 34485, 34642, 37941]


**Xuất file kết quả**

In [None]:
output_file = "recommendations.txt"
output = recommendations.map(lambda x: f"{x[0]}\t{','.join(map(str, x[1]))}")

output = output.collect()
with open(output_file, "w") as f:
    for line in output:
        f.write(line + "\n")

In [None]:
# Stop Spark
sc.stop()