In [45]:
# pyspark --conf “spark.ui.port=10101”

!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u292-b10-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.


In [46]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [47]:
# Let's import the libraries we will need
import pandas as pd
import numpy as np
from numpy import linalg
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql.functions import monotonically_increasing_id

In [48]:
sc = SparkContext.getOrCreate()
sc.stop()

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

# Problem 2

## Page rank implementation

In [328]:
def make_data(line):
  splitted = line.split()
  node1 = int(splitted[0])
  node2 = int(splitted[1])
  return node1, node2
  
def generateM(x):
		src, dist_list = x
		deg = len(dist_list)
		return [(dist, src, 1./deg) for dist in dist_list]

def generateL(x):
		src, dist_list = x
		return [(dist, src, 1) for dist in dist_list]

def unique(lst):
  unique_lst = []
  for element in lst[1]:
    if element not in unique_lst:
      unique_lst.append(element)
  return lst[0], unique_lst

In [341]:
beta = 0.8
n = 1000

In [342]:
lines = sc.textFile("graph-full.txt")

In [343]:
data = lines.map(make_data)
data = data.groupByKey().mapValues(list)

In [344]:
data = data.map(lambda x: unique(x))

In [233]:
M = data.flatMap(lambda x: generateM(x))	

In [199]:
r = [1/n for _ in range(n)]

In [200]:
for _ in range(40):
	muliplications = M.map(lambda x: (x[0], x[2] * r[int(x[1]) - 1]))
	r_rdd = muliplications.reduceByKey(lambda x1, x2: x1 + x2)
	r_rdd = r_rdd.map(lambda x: (x[0], beta * x[1] + (1 - beta) / n))
  # Update r
	r = r_rdd.sortBy(lambda x: x[0]).map(lambda x: x[1]).collect()

In [201]:
R = []
for i in range(1, n + 1):
  R.append((i, r[i - 1])) 

In [202]:
R_sorted = sorted(R, key = lambda x : x[1])

In [206]:
bottom = R_sorted[:5]
top = R_sorted[-5:][::-1]

In [207]:
bottom

[(558, 0.0003286018525215297),
 (93, 0.0003513568937516577),
 (62, 0.00035314810510596274),
 (424, 0.0003548153864930145),
 (408, 0.00038779848719291705)]

In [208]:
top

[(263, 0.0020202911815182184),
 (537, 0.0019433415714531497),
 (965, 0.0019254478071662631),
 (243, 0.0018526340162417312),
 (285, 0.0018273721700645144)]

# HITS implementation

In [345]:
lam = 1
mi = 1
n = 1000
h = [1 for _ in range(n)]

In [346]:
L = data.flatMap(lambda x: generateL(x))	

In [347]:
L_transpose = L.map(lambda x: (x[1], x[0], 1))

In [348]:
for _ in range(40):
  muliplications_a = L_transpose.map(lambda x: (x[0], x[2] * h[int(x[1]) - 1]))
  a_rdd = muliplications_a.reduceByKey(lambda x1, x2: x1 + x2)
  a = a_rdd.sortBy(lambda x: x[0]).map(lambda x: x[1]).collect()
  # Normalization
  max_a = np.max(a)
  a = np.array(a) * (1/max_a)

  multiplications_h = L.map(lambda x: (x[0], x[2] * a[int(x[1]) - 1]))
  h_rdd = multiplications_h.reduceByKey(lambda x1, x2: x1 + x2)
  h = h_rdd.sortBy(lambda x: x[0]).map(lambda x: x[1]).collect()
  # Normalization
  max_h = np.max(h)
  h = np.array(h) * (1/max_h)

In [349]:
A = []
for i in range(1, n + 1):
  A.append((i, a[i - 1])) 

In [350]:
H = []
for i in range(1, n + 1):
  H.append((i, h[i - 1])) 

In [351]:
A_sorted = sorted(A, key = lambda x : x[1])
H_sorted = sorted(H, key = lambda x : x[1])

In [352]:
bottom_a = A_sorted[:5]
top_a = A_sorted[-5:][::-1]
bottom_h = H_sorted[:5]
top_h = H_sorted[-5:][::-1]

In [353]:
bottom_a

[(23, 0.042066854890936534),
 (835, 0.057790593544330145),
 (141, 0.06453117646225179),
 (539, 0.06602659373418492),
 (889, 0.07678413939216452)]

In [354]:
top_a

[(840, 1.0),
 (155, 0.9499618624906543),
 (234, 0.8986645288972263),
 (389, 0.863417110184379),
 (472, 0.8632841092495217)]

In [355]:
bottom_h

[(19, 0.05608316377607617),
 (135, 0.06653910487622793),
 (462, 0.07544228624641901),
 (24, 0.08171239406816944),
 (910, 0.08571673456144877)]

In [356]:
top_h

[(893, 0.9999999999999999),
 (16, 0.9635572849634398),
 (799, 0.9510158161074014),
 (146, 0.9246703586198444),
 (473, 0.8998661973604049)]