In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 67kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 48.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=1c228fb1bd0e1e5c243c4daa3cac12c34b4025085c5e03b9ded3ccc35125de58
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5
open

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [0]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F

import numpy as np
import pandas as pd

In [0]:
# create the context
conf = SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '6G')
        .set('spark.driver.memory', '45G')
        .set('spark.driver.maxResultSize', '10G'))
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [0]:
# load the data
data = sc.textFile('soc-LiveJournal1Adj.txt')

In [0]:
def lambda_process_line(line):
    user_id, friend_ids = line.split('\t')
    if friend_ids != '':
        friend_ids_lst = friend_ids.split(',')
    else:
        friend_ids_lst = []
    return (user_id, friend_ids_lst)

def lambda_make_pairs(line):
    user_id = line[0]
    friend_ids = line[1]
    pairs = []
    # first-degree friendship:
    for friend_id in friend_ids:
        pair = (user_id, friend_id)
        if user_id > friend_id:
            pair = (friend_id, user_id)
        pairs.append((pair, 0))
    # second-degree friendship:
    for i in range(len(friend_ids)-1):
        for j in range(i+1, len(friend_ids)):
            pair = (friend_ids[i], friend_ids[j])
            if friend_ids[i] > friend_ids[j]:
                pair = (friend_ids[j], friend_ids[i])
            pairs.append((pair, 1))
    return pairs

In [0]:
# step 1
processed_data = data.map(lambda line: lambda_process_line(line))
all_friend_pairs = processed_data.flatMap(lambda line: lambda_make_pairs(line))

In [0]:
# step 2
mutual_friend_pairs = all_friend_pairs.groupByKey().filter(lambda pair: 0 not in pair[1]).flatMapValues(lambda x: x)

In [0]:
# step 3
reduced_mutual_friend_pairs = mutual_friend_pairs.reduceByKey(lambda x, y: x+y)

In [0]:
# step 4
recommend_friend_pairs = reduced_mutual_friend_pairs.flatMap(lambda pair: [(pair[0][0], (pair[0][1], pair[1])), (pair[0][1], (pair[0][0], pair[1]))]).groupByKey().mapValues(list) 

In [0]:
# step 5
sorted_recommend_friend_pairs = recommend_friend_pairs.map(lambda user: (user[0], sorted(user[1], key = lambda x: (-x[1], int(x[0])))))

In [0]:
result = sorted_recommend_friend_pairs.collect()

In [17]:
# problem 1.c

user_ids = ['11', '924', '8941', '8942', '9019', '9020', '9021', '9022', '9990', '9992', '9993']

for user_id in user_ids:
    for line in result:
        cur_id, recommendations = line
        if cur_id == user_id:
            recommendation_ids = []
            for recommendation in recommendations:
                recommendation_ids.append(recommendation[0])
            print(user_id, recommendation_ids)

11 ['27552', '7785', '27573', '27574', '27589', '27590', '27600', '27617', '27620', '27667', '32072', '33192', '10', '12', '110', '638', '1797', '2141', '5784', '6893', '7717', '7849', '8773', '9154', '9998', '10253', '10782', '12666', '23586', '25186', '25195', '25205', '25256', '26976', '27555', '27564', '27570', '27583', '27587', '27588', '27626', '27638', '27655', '27661', '30214', '32128', '32233', '32505', '33017', '35451', '38737', '45018', '1', '2', '3', '4', '5', '6', '7', '8', '9', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '90', '91', '92', '93', '94', '106', '131

In [0]:
# output recommendations
with open("output.txt","w+") as file:
    for line in result:
        cur_id, recommendations = line
        recommendation_ids = []
        for recommendation in recommendations:
            recommendation_ids.append(recommendation[0])
        new_line = str(cur_id) + '\t' + ','.join(recommendation_ids) + '\n'
        file.write(new_line)