# People You Might Know

In this notebook I will write a **Spark** program to implement a simple *People You Might Know* social network friendship recommendation algorithm. The key idea is that if two people have a lot of mutual friends, then the system should recommend that they connect with each other

Here we assume a bidirectional relation; meaning that if A is a friend of B, then B is also a friend of A

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 64kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 44.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=e536147bf372a6683a67ba6570bf7197d1080ef9b2dceef3ccc733064cf9b018
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5
open

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

## Load Data


The file contains the adjacency list and has multiple lines in the following format:

$<\texttt{User}>, <\texttt{TAB}>, <\texttt{Friends}>$

Here, $<\texttt{User}>$ is a unique integer ID corresponding to a unique user and $<\texttt{Friends}>$ is a comma separated list of unique IDs corresponding to the friends of the user with the unique ID $<\texttt{User}>$. Note that the friendships are mutual.

In [None]:
dat = sc.textFile("soc-LiveJournal1Adj.txt")
dat.take(2)

['0\t1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94',
 '1\t0,5,20,135,2409,8715,8932,10623,12347,12846,13840,13845,14005,20075,21556,22939,23520,28193,29724,29791,29826,30691,31232,31435,32317,32489,34394,35589,35605,35606,35613,35633,35648,35678,38737,43447,44846,44887,49226,49985,623,629,4999,6156,13912,14248,15190,17636,19217,20074,27536,29481,29726,29767,30257,33060,34250,34280,34392,34406,34418,34420,34439,34450,34651,45054,49592']

## Data Pre-processing

In [None]:
# split into users and friends
dat_split = dat.map(lambda x: x.split("\t"))
dat_split.take(2)

[['0',
  '1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94'],
 ['1',
  '0,5,20,135,2409,8715,8932,10623,12347,12846,13840,13845,14005,20075,21556,22939,23520,28193,29724,29791,29826,30691,31232,31435,32317,32489,34394,35589,35605,35606,35613,35633,35648,35678,38737,43447,44846,44887,49226,49985,623,629,4999,6156,13912,14248,15190,17636,19217,20074,27536,29481,29726,29767,30257,33060,34250,34280,34392,34406,34418,34420,34439,34450,34651,45054,49592']]

In [None]:
# convert str into list for friends
friends_list = dat_split.map(lambda x: (x[0], x[1].split(",")))
friends_list.take(1)

In [None]:
# construct directional friends pairs (a,b) such that a is a friend of b
friend_pairs = friends_list.flatMap(lambda x: [(x[0], i) for i in x[1]])
friend_pairs.take(5)

[('0', '1'), ('0', '2'), ('0', '3'), ('0', '4'), ('0', '5')]

In [None]:
# add column names to data for JOIN
friend_pairs_df = friend_pairs.toDF(("UserA","UserB"))
dat_df = dat_split.toDF(("User", "Friends"))

In [None]:
friend_pairs_df.show(2)
dat_df.show(2)

+-----+-----+
|UserA|UserB|
+-----+-----+
|    0|    1|
|    0|    2|
+-----+-----+
only showing top 2 rows

+----+--------------------+
|User|             Friends|
+----+--------------------+
|   0|1,2,3,4,5,6,7,8,9...|
|   1|0,5,20,135,2409,8...|
+----+--------------------+
only showing top 2 rows



## Friends Recommendation

In [None]:
# join friend_pairs_df and dat_df twice to get friends of user A and friends of user B
friends_df1 = friend_pairs_df.join(dat_df, friend_pairs_df.UserA==dat_df.User, how="inner") \
                             .drop("User") \
                             .withColumnRenamed("Friends", "FriendsOfUserA") 
friends_df1.show()
friends_df = friends_df1.join(dat_df, friends_df1.UserB==dat_df.User, how="inner") \
                        .drop("User") \
                        .withColumnRenamed("Friends", "FriendsOfUserB") 
friends_df.show()

In [None]:
friends_rdd = friends_df.rdd
friends_rdd.take(2)

[Row(UserA='37246', UserB='10096', FriendsOfUserA='10063,10100,36729,36862,36868,36894,36903,36919,36955,37044,37085,37155,37167,10021,10040,10096,10135,36701,36817,37047,37061,37069,37096,37168,37296,37337,37426,37511,37823,38479,40824,44132,37559,37743', FriendsOfUserB='10000,10008,10010,10026,10039,10053,10072,10093,10103,10114,10123,10142,36683,36687,36693,36717,36720,36728,36811,36817,36905,36947,37068,37135,37156,37167,37170,44050,44053,44094,10097,21567,22115,25200,36678,36688,36703,36707,36723,36738,36752,36765,36777,36780,36788,36799,36800,36801,36816,36824,36827,36836,36840,36841,36847,36875,36887,36889,36892,36908,36936,36952,36962,36980,36988,37000,37006,37020,37035,37041,37060,37070,37140,37148,37168,37220,37221,37231,37246,37253,37326,37385,37438,37459,37485,37493,37518,37575,37613,37665,37707,37725,37750,37779,37806,37812,44083,44149,44548'),
 Row(UserA='10114', UserB='10096', FriendsOfUserA='1135,10000,10018,10021,10026,10068,10072,10078,10086,10093,10095,10096,10097,10

In [None]:
# convert str to list for FriendsOfUserA and FriendsOfUserB
friends_two_lists = friends_rdd.map(lambda x: [x[0], x[1], x[2].split(","), x[3].split(",")])
friends_two_lists.take(1)

In [None]:
# extract friends of b but not a (2nd order nonconnected friends of a)
friends_diff = friends_two_lists.map(lambda x: (x[0], list(set(x[3]) - set(x[2]))))
friends_diff.take(1)

In [None]:
# put all such friends together in a row correpsonding to each user 
friends_diff_total = friends_diff.reduceByKey(lambda a,b:a+b)
friends_diff_total.take(1)

In [None]:
# take out user a itself
friends_diff_total_exclude_self = friends_diff_total.map(lambda x: (x[0], [int(i) for i in x[1] if i not in x[0]]))
friends_diff_total_exclude_self.take(1)

In [None]:
# sanity check: 
# expect the following code to return False
friends_diff_total_exclude_self.take(1)[0][1] in friends_diff_total_exclude_self.take(1)[0][1]

False

In [None]:
# count occurrence of each 2nd-order nonconnected friends of each user
from collections import Counter
friends_diff_total_count = friends_diff_total_exclude_self.map(lambda x: (x[0], Counter(x[1]).most_common()))
friends_diff_total_count.take(1)

In [None]:
# convert the friends and counts into a panda Series
friends_pd_series = friends_diff_total_count.map(lambda x: (x[0], pd.Series(dict(x[1]))))
friends_pd_series.take(1)

[('10142', 37135    15
  36905    12
  36811    12
  36683    10
  10114    10
           ..
  36944     1
  36711     1
  44144     1
  37112     1
  36921     1
  Length: 564, dtype: int64)]

In [None]:
# sort recommendations by number of common friends (desc) first, then the user ID (asc)
import numpy as np 
friends_pd_series_sorted = friends_pd_series.map(lambda x: (x[0], x[1].iloc[np.lexsort([x[1].keys(), -x[1].values])]))
friends_pd_series_sorted.take(1)

[('10142', 37135    15
  36811    12
  36905    12
  10103    10
  10114    10
           ..
  44204     1
  44205     1
  44215     1
  44241     1
  44537     1
  Length: 564, dtype: int64)]

In [None]:
# extract top 10 2nd-order nonconnected friends
top_10_friends = friends_pd_series_sorted.map(lambda x: (x[0],x[1][:10]))
top_10_friends.take(1)

[('10142', 37135    15
  36811    12
  36905    12
  10103    10
  10114    10
  36679    10
  36683    10
  37156    10
  10000     9
  36669     9
  dtype: int64)]

In [None]:
# discard count, just keep friend IDs
recommends = top_10_friends.map(lambda x: (x[0], [i[1] for i in list(zip(x[1], x[1].index))]))
recommends.take(1)

[('10142',
  [37135, 36811, 36905, 10103, 10114, 36679, 36683, 37156, 10000, 36669])]

In [None]:
# sanity check
# expect to see "27552,7785,27573,27574,27589,27590,27600,27617,27620,27667"
test = recommends.filter(lambda x: x[0]=='11')
test.take(1)

[('11', [27552, 7785, 27573, 27574, 27589, 27590, 27600, 27617, 27620, 27667])]

In [None]:
# recommends for write-up
targets = ['924', '8941', '8942', '9019','9020', '9021','9022', '9990', '9992','9993']
tests = recommends.filter(lambda x: x[0] in targets)
tests.take(10)

[('11', [27552, 7785, 27573, 27574, 27589, 27590, 27600, 27617, 27620, 27667])]

In [None]:
# format output
recommends_int_id = recommends.map(lambda x: [x[0], [int(i) for i in x[1]]])
recommends_int_id.take(1)

[['10142',
  [37135, 36811, 36905, 10103, 10114, 36679, 36683, 37156, 10000, 36669]]]

In [None]:
recommends_strip_bracket = recommends_int_id.map(lambda x: [x[0], str(x[1]).strip('[]')])
recommends_strip_bracket.take(1)

[['10142',
  '37135, 36811, 36905, 10103, 10114, 36679, 36683, 37156, 10000, 36669']]

In [None]:
recommends_deliverable = recommends_strip_bracket.map(lambda x: x[0]+'\t'+x[1])
recommends_deliverable.take(2)

['10142\t37135, 36811, 36905, 10103, 10114, 36679, 36683, 37156, 10000, 36669',
 '36947\t36936, 36908, 10000, 36665, 36988, 10028, 37170, 36836, 37070, 37168']

In [None]:
 recommends_deliverable.coalesce(1).saveAsTextFile("Q1_output")