# Homework 3

Submit your *.ipynb through Gradescope by downloading: `File` ⇒ `Download` ⇒ `Download .ipynb`, and then submit with your PDF via link to your repository.

### Setup

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [2]:
#@title Import PySpark and create SparkContext

import itertools
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [3]:
#@title Download the data from the course website
!wget https://course.ccs.neu.edu/cs6220/fall2023/homework-3/soc-LiveJournal1Adj.txt

--2024-02-07 07:25:16--  https://course.ccs.neu.edu/cs6220/fall2023/homework-3/soc-LiveJournal1Adj.txt
Resolving course.ccs.neu.edu (course.ccs.neu.edu)... 129.10.117.35
Connecting to course.ccs.neu.edu (course.ccs.neu.edu)|129.10.117.35|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4156181 (4.0M) [text/plain]
Saving to: ‘soc-LiveJournal1Adj.txt’


2024-02-07 07:25:18 (2.73 MB/s) - ‘soc-LiveJournal1Adj.txt’ saved [4156181/4156181]



### Load the data in!

In [4]:
# Read the data in
lines = sc.textFile("soc-LiveJournal1Adj.txt", 1)
lines = lines.map(lambda line: line.split())

In [5]:
lines.take(2)

[['0',
  '1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94'],
 ['1',
  '0,5,20,135,2409,8715,8932,10623,12347,12846,13840,13845,14005,20075,21556,22939,23520,28193,29724,29791,29826,30691,31232,31435,32317,32489,34394,35589,35605,35606,35613,35633,35648,35678,38737,43447,44846,44887,49226,49985,623,629,4999,6156,13912,14248,15190,17636,19217,20074,27536,29481,29726,29767,30257,33060,34250,34280,34392,34406,34418,34420,34439,34450,34651,45054,49592']]

# Recommendation Algorithm

In [6]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import os

# Function to parse each line of the input file
def parse_line(line):
    parts = line.split('\t')
    user = parts[0]
    friends = parts[1].split(',') if len(parts) > 1 else []
    return user, friends

# Step 1: Read and parse the data
lines = sc.textFile("soc-LiveJournal1Adj.txt")
user_friends = lines.map(parse_line)

# Step 2: Map phase - Generate potential friends and count mutual friends
def generate_pairs(user_friends):
    user, friends = user_friends
    for friend_pair in itertools.combinations(friends, 2):
        yield (friend_pair, 1)
        yield ((friend_pair[1], friend_pair[0]), 1)

# Filter out pairs where users are already friends
def filter_existing_friends(user_friends):
    user, friends = user_friends
    for friend in friends:
        yield ((user, friend), -1)

mapped_pairs = user_friends.flatMap(generate_pairs)
existing_friendships = user_friends.flatMap(filter_existing_friends)

# Step 3: Reduce phase - Aggregate counts and filter out existing friendships
friend_suggestions = (mapped_pairs.union(existing_friendships)
                      .reduceByKey(lambda a, b: a + b)
                      .filter(lambda pair_count: pair_count[1] > 0))

# Step 4: Recommendations - Sort and take top N recommendations
def sort_and_take_top_n(record):
    user, suggestions = record
    sorted_suggestions = sorted(suggestions, key=lambda x: (-x[1], x[0]))[:10]
    return user, [x[0] for x in sorted_suggestions]

recommendations = (friend_suggestions
                   .map(lambda x: (x[0][0], (x[0][1], x[1])))  # Convert to (user, (suggested_friend, count))
                   .groupByKey()
                   .mapValues(list)
                   .map(sort_and_take_top_n))

# Step 5: Output - Save the recommendations to a file
recommendations.map(lambda x: f"{x[0]}\t{','.join(x[1])}").coalesce(1).saveAsTextFile("/content/output_temp")

# Navigate to the output directory
os.chdir("/content/output_temp")

# Concatenate all part files into a single file
!cat part-* > /content/output.txt

# Cleanup: Remove the temporary directory to free up space
!rm -rf /content/output_temp
