# Homework 2
## Social network friendship recommendation algorithm in Spark

### Setup

In [7]:
# Setup Spark on Colab environment

!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m17.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=b620a7a9492eda212e375d0955dd80091d4e0ceee52569f72535ad891c4bd5f1
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [8]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [9]:
id='1K6CfkG-SbmqOPnvQbseQoIipfjyFpt0i'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('hw2_data.txt')

In [10]:
!head -n 10000 hw2_data.txt > short_data.txt

In [11]:
from pyspark.sql import *
#from pyspark.sql.functions import *
from pyspark import SparkConf, SparkContext
import pandas as pd

import re
import sys
import itertools

# Write code to create a Spark context.
conf = SparkConf()
sc = SparkContext.getOrCreate();

users = sc.textFile("short_data.txt", 1)
users.take(5)

['0\t1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94',
 '1\t0,5,20,135,2409,8715,8932,10623,12347,12846,13840,13845,14005,20075,21556,22939,23520,28193,29724,29791,29826,30691,31232,31435,32317,32489,34394,35589,35605,35606,35613,35633,35648,35678,38737,43447,44846,44887,49226,49985,623,629,4999,6156,13912,14248,15190,17636,19217,20074,27536,29481,29726,29767,30257,33060,34250,34280,34392,34406,34418,34420,34439,34450,34651,45054,49592',
 '2\t0,117,135,1220,2755,12453,24539,24714,41456,45046,49927,6893,13795,16659,32828,41878',
 '3\t0,12,41,55,1532,12636,13185,27552,38737',
 '4\t0,8,14,15,18,27,72,80,15326,19068,19079,24596,42697,46126,74,77,33269,38792,38822']

In [12]:
# Create pairs of friends for each user
def friend_pairs(line):
    split = line.split()
    user_id = int(split[0])
    if len(split) == 1:
        friends = []
    else:
        friends = list(map(lambda x: int(x), split[1].split(',')))
    return user_id, friends

# Make a list of tuples which is a connection between two friends 
def user_connected_friends(friendships):
    user_id = friendships[0]
    friends = friendships[1]
    user_connections = []
    for user in friends:
        key = (user_id, user)
        if user_id > user:
            key = (user, user_id)
        user_connections.append((key, 0))
    for pairs in itertools.combinations(friends, 2):
        f1 = pairs[0]
        f2 = pairs[1]
        key = (f1, f2)
        if f1 > f2:
            key = (f2, f1)
        user_connections.append((key, 1))
    return user_connections

# Map pairs of friends
def user_friend_recommendation_pairs(mutual):
    mutual_friend_pair = mutual[0]
    mutual_friends_count = mutual[1]
    f1 = mutual_friend_pair[0]
    f2 = mutual_friend_pair[1]
    rec1 = (f1, (f2, mutual_friends_count))
    rec2 = (f2, (f1, mutual_friends_count))
    return [rec1, rec2]

# Sort the list up to 10 recommended friends
def sort_recommendations(recs):
    recs.sort(key = lambda x: (-int(x[1]), int(x[0])))
    return list(map(lambda x: x[0], recs))[:10]


In [13]:
friendship_pairs = users.map(friend_pairs)
friendship_pairs.take(5)
#friendship_pairs.saveAsTextFile(outputDir)

[(0,
  [1,
   2,
   3,
   4,
   5,
   6,
   7,
   8,
   9,
   10,
   11,
   12,
   13,
   14,
   15,
   16,
   17,
   18,
   19,
   20,
   21,
   22,
   23,
   24,
   25,
   26,
   27,
   28,
   29,
   30,
   31,
   32,
   33,
   34,
   35,
   36,
   37,
   38,
   39,
   40,
   41,
   42,
   43,
   44,
   45,
   46,
   47,
   48,
   49,
   50,
   51,
   52,
   53,
   54,
   55,
   56,
   57,
   58,
   59,
   60,
   61,
   62,
   63,
   64,
   65,
   66,
   67,
   68,
   69,
   70,
   71,
   72,
   73,
   74,
   75,
   76,
   77,
   78,
   79,
   80,
   81,
   82,
   83,
   84,
   85,
   86,
   87,
   88,
   89,
   90,
   91,
   92,
   93,
   94]),
 (1,
  [0,
   5,
   20,
   135,
   2409,
   8715,
   8932,
   10623,
   12347,
   12846,
   13840,
   13845,
   14005,
   20075,
   21556,
   22939,
   23520,
   28193,
   29724,
   29791,
   29826,
   30691,
   31232,
   31435,
   32317,
   32489,
   34394,
   35589,
   35605,
   35606,
   35613,
   35633,
   35648,
   35678,
   38737,
   43

In [14]:
friend_connections = friendship_pairs.flatMap(user_connected_friends)
friend_connections.take(5)
# [((0, 1), 0), ((0, 2), 0), ((0, 3), 0), ((0, 4), 0), ((0, 5), 0)]; ((friend_pair), 0/1), 0 if connected 1 if not connected but share a mutual friend
# friend_connections.saveAsTextFile(outputDir)

[((0, 1), 0), ((0, 2), 0), ((0, 3), 0), ((0, 4), 0), ((0, 5), 0)]

In [15]:
# finds the number of mutual friends between users who are not already friends
mutual_connections = friend_connections.groupByKey().filter(lambda pair: 0 not in pair[1]).map(lambda pair:(pair[0], sum(pair[1])))
# get pairs of recommended friends  
recommendations = mutual_connections.flatMap(user_friend_recommendation_pairs)

In [16]:
friend_recommendations = recommendations.groupByKey().map(lambda mf: (mf[0], sort_recommendations(list(mf[1]))))
friend_recommendations.take(5)
# [(24582, [9607, 13478, 24546, 24686, 35071, 4687, 7506, 7639, 14204, 24530]), (32780, [4425, 32440, 32761, 32775, 1412, 4339, 4408, 4414, 32356, 32394]), ...]; (user_id, [list of 10 recommened friends])

[(27656,
  [19365, 25186, 37011, 44049, 10018, 35179, 37132, 41851, 44089, 44101]),
 (35599, [19, 1347, 2043, 2496, 9803, 11399, 14240, 17195, 17636, 32169]),
 (32640, [19, 242, 522, 553, 580, 886, 1234, 2122, 3549, 3706]),
 (28196, [39, 206, 494, 543, 611, 660, 964, 1495, 1606, 2128]),
 (24228, [343, 24230, 24250, 18445, 705, 16796, 24218, 24249, 64, 265])]

In [17]:
user_ids_recs = friend_recommendations.filter(lambda recs: recs[0] in [924, 8941, 8942, 9019, 9020, 9021, 9022, 9990, 9992, 9993]).sortByKey()
user_ids_recs.collect()

[(924, [439, 2409, 6995, 11860, 15416, 43748, 45881]),
 (9019, [9022, 317, 9023]),
 (9020, [9021, 9016, 9017, 9022, 317, 9023]),
 (9021, [9020, 9016, 9017, 9022, 317, 9023]),
 (9022, [9019, 9020, 9021, 317, 9016, 9017, 9023])]

In [18]:
user_ids_recs.saveAsTextFile("output.txt")
sc.stop()