# Implementing PageRank 


### Setup

Let's setup Spark on your Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 60kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 41.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=18d83c73056ff7e4ebd86072b8030a552398d175a8210bd3740b609724af4574
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5
open

Now we authenticate a Google Drive client to download the filea we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [0]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [0]:
# id='1QtPy_HuIMSzhtYllT3-WeM3Sqg55wK_D'
# downloaded = drive.CreateFile({'id': id})
# downloaded.GetContentFile('MovieLens.training')

# id='1ePqnsQTJRRvQcBoF2EhoPU8CU1i5byHK'
# downloaded = drive.CreateFile({'id': id})
# downloaded.GetContentFile('MovieLens.test')

# id='1ncUBWdI5AIt3FDUJokbMqpHD2knd5ebp'
# downloaded = drive.CreateFile({'id': id})
# downloaded.GetContentFile('MovieLens.item')

If you executed the cells above, you should be able to see the dataset we will use for this Colab under the "Files" tab on the left panel.

Next, we import some of the common libraries needed for our task.

In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [0]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a **local runtime**).

In [5]:
spark

If you are running this Colab on the Google hosted runtime, the cell below will create a *ngrok* tunnel which will allow you to still check the Spark UI.

In [0]:
# !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
# !unzip ngrok-stable-linux-amd64.zip
# get_ipython().system_raw('./ngrok http 4050 &')
# !curl -s http://localhost:4040/api/tunnels | python3 -c \
#     "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


### Data Extraction

In [0]:
# source \t destination
def splitLine(line):
  vals = line.split("\t")
  return ((vals[0], vals[1]), 1)


##### Small

In [25]:
edges_small = sc.textFile("graph-small.txt").map(lambda r: splitLine(r))

edges_small.take(1)

[(('100', '1'), 1)]

In [26]:
edges_small.count()

1024

In [27]:
unique_edges_small = edges_small.groupByKey().map(lambda r: r[0])

unique_edges_small.take(1)

[('100', '1')]

In [28]:
unique_edges_small.count()

950

In [30]:
unique_nodes_small = unique_edges_small.flatMap(lambda r: [r[0], r[1]])

unique_nodes_small_list = sorted(set(unique_nodes_small.collect()), key=lambda num: int(num))

n_small = len(unique_nodes_small_list)
n_small

100

#### Full

In [31]:
edges_full = sc.textFile("graph-full.txt").map(lambda r: splitLine(r))

edges_full.take(1)

[(('1', '2'), 1)]

In [115]:
edges_full.count()

8192

In [33]:
unique_edges_full = edges_full.groupByKey().map(lambda r: r[0])

unique_edges_full.take(1)

[('2', '3')]

In [34]:
unique_edges_full.count()

8161

In [35]:
unique_nodes_full = unique_edges_full.flatMap(lambda r: [r[0], r[1]])

unique_nodes_full_list = sorted(set(unique_nodes_full.collect()), key=lambda num: int(num))

n_full = len(unique_nodes_full_list)
n_full

1000

In [0]:
from collections import defaultdict

##### Out Degree

###### Small

In [36]:
out_degree_small = unique_edges_small.map(lambda n: (n[0], 1)).reduceByKey(lambda a,b: a+b).collectAsMap()
out_degree_small['10']

14

In [37]:
out_tuple_count_small = unique_edges_small.map(lambda n: ((n[0], n[1]), 1)).reduceByKey(lambda a,b: 1)

out_tuple_count_small.take(1)

[(('100', '1'), 1)]

In [39]:
out_edge_exists_small = set(out_tuple_count_small.collectAsMap().keys())
('100', '1') in out_edge_exists_small

True

###### Full

In [116]:
out_degree_full = unique_edges_full.map(lambda n: (n[0], 1)).reduceByKey(lambda a,b: a+b).collectAsMap()
out_degree_full['10']

5

In [41]:
out_tuple_count_full = unique_edges_full.map(lambda n: ((n[0], n[1]), 1)).reduceByKey(lambda a,b: 1)

out_tuple_count_full.take(1)

[(('2', '3'), 1)]

In [42]:
out_edge_exists_full = set(out_tuple_count_full.collectAsMap().keys())
('2', '3') in out_edge_exists_full

True

### Stochastic Web Matrix

##### Small

In [43]:
M_small = np.zeros((n_small, n_small))

for i in range(n_small):
  for j in range(n_small):
    key = (str(j), str(i))
    if key in out_edge_exists_small:
      M_small[j, i] = out_degree_small[str(i)] ** -1

np.sum(M_small)    

115.40748303983597

In [52]:
out_degree_small['1']

14

In [47]:
M_small.shape

(100, 100)

In [51]:
M_small[:,1]

array([0.        , 0.        , 0.        , 0.07142857, 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.07142857, 0.        , 0.07142857, 0.07142857,
       0.        , 0.        , 0.07142857, 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.07142857,
       0.07142857, 0.        , 0.        , 0.07142857, 0.        ,
       0.07142857, 0.07142857, 0.        , 0.07142857, 0.        ,
       0.        , 0.        , 0.        , 0.07142857, 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.07142857,
       0.        , 0.07142857, 0.        , 0.        , 0.        ,
       0.07142857, 0.07142857, 0.        , 0.07142857, 0.        ,
       0.        , 0.07142857, 0.        , 0.07142857, 0.07142857,
       0.        , 0.        , 0.        , 0.        , 0.07142857,
       0.07142857, 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.07142857, 0.     

##### Full

In [54]:
M_full = np.zeros((n_full, n_full))

for i in range(n_full):
  for j in range(n_full):
    key = (str(j), str(i))
    if key in out_edge_exists_full:
      M_full[j, i] = out_degree_full[str(i)] ** -1

np.sum(M_full)

1128.7550039528755

In [55]:
M_full.shape

(1000, 1000)

###  PageRank Eigen Vector

In [0]:
# Initialize every pages node with 1/n page rank

##### Small

In [56]:
page_rank_small = np.zeros(n_small) + 1/n_small

np.sum(page_rank_small)

0.9999999999999999

In [69]:
page_rank_small[0]

0.01

##### Full

In [57]:
page_rank_full = np.zeros(n_full) + 1/n_full

np.sum(page_rank_full)

1.0000000000000004

In [70]:
page_rank_full[0]

0.001

### Teleport

In [0]:
# Teleport probability=1-beta  
beta = 0.8
teleport_prob = 1 - beta

In [117]:
teleport_vector_small = np.zeros(n_small) + teleport_prob/n_small
teleport_vector_small[0]

0.0019999999999999996

In [118]:
teleport_vector_full = np.zeros(n_full) + teleport_prob/n_full
teleport_vector_full[0]

0.00019999999999999996

### Power Iteration

In [119]:
M_small[:,1]

array([0.        , 0.        , 0.        , 0.07142857, 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.07142857, 0.        , 0.07142857, 0.07142857,
       0.        , 0.        , 0.07142857, 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.07142857,
       0.07142857, 0.        , 0.        , 0.07142857, 0.        ,
       0.07142857, 0.07142857, 0.        , 0.07142857, 0.        ,
       0.        , 0.        , 0.        , 0.07142857, 0.        ,
       0.        , 0.        , 0.        , 0.        , 0.07142857,
       0.        , 0.07142857, 0.        , 0.        , 0.        ,
       0.07142857, 0.07142857, 0.        , 0.07142857, 0.        ,
       0.        , 0.07142857, 0.        , 0.07142857, 0.07142857,
       0.        , 0.        , 0.        , 0.        , 0.07142857,
       0.07142857, 0.        , 0.        , 0.        , 0.        ,
       0.        , 0.        , 0.        , 0.07142857, 0.     

In [0]:
# Page Rank Update

k=40 

pr_small = page_rank_small.copy()
pr_full = page_rank_full.copy()

for i in range(k):
  # print("teleport_vector_small=", teleport_vector_small, "beta=", beta, "dot_prod=", np.dot(M_small, pr_small))
  pr_small = teleport_vector_small + beta * np.dot(M_small, pr_small)
  pr_full = teleport_vector_full + beta * np.dot(M_full, pr_full)
  # print("pr_small[1]=", pr_small[1])
  # print("pr_full[1]=", pr_full[1])
  # print(np.linalg.norm(pr_small))

In [0]:
# pr_small

In [120]:
max_pr_small_idx = np.argmax(pr_small) 
max_pr_small_idx

59

In [121]:
highest_pr_idx = pr_small.argsort()[-5:][::-1]
for idx in highest_pr_idx:
  print("pr_index=", idx, "pr_value=", pr_small[idx])

pr_index= 59 pr_value= 0.01749885039395662
pr_index= 45 pr_value= 0.01688346361709461
pr_index= 80 pr_value= 0.016272086733217724
pr_index= 83 pr_value= 0.016125973293865548
pr_index= 1 pr_value= 0.015455644197358502


In [97]:
lowest_pr_idx = pr_small.argsort()[::-1][-5:]
for idx in lowest_pr_idx:
  print("pr_index=", idx, "pr_value=", pr_small[idx])

pr_index= 63 pr_value= 0.005864686514667147
pr_index= 15 pr_value= 0.0057998068882654675
pr_index= 5 pr_value= 0.005602408426993661
pr_index= 52 pr_value= 0.005056751932507394
pr_index= 0 pr_value= 0.0019999999999999996


In [0]:
# pr_full

In [64]:
max_pr_full_idx = np.argmax(pr_full) 
max_pr_full_idx

840

In [132]:
pr_full[max_pr_full_idx]

0.002310102017249554

In [99]:
highest_pr_idx = pr_full.argsort()[-5:][::-1]
for idx in highest_pr_idx:
  print("pr_index=", idx, "pr_value=", pr_full[idx])

pr_index= 840 pr_value= 0.002310102017249554
pr_index= 155 pr_value= 0.0022004576615845635
pr_index= 234 pr_value= 0.00203887545379242
pr_index= 137 pr_value= 0.002017045601032364
pr_index= 666 pr_value= 0.0019940983192039846


In [100]:
lowest_pr_idx = pr_full.argsort()[::-1][-5:]
for idx in lowest_pr_idx:
  print("pr_index=", idx, "pr_value=", pr_full[idx])

pr_index= 229 pr_value= 0.0005087693573589022
pr_index= 769 pr_value= 0.0005065716245767926
pr_index= 262 pr_value= 0.00041564802743984013
pr_index= 218 pr_value= 0.00041420359222972764
pr_index= 0 pr_value= 0.00019999999999999996


# Implementing HITS

### Link Matrix

In [184]:
L_small = np.zeros((n_small, n_small))

for i in range(n_small):
  for j in range(n_small):
    key = (str(j), str(i))
    if key in out_edge_exists_small:
      L_small[j, i] = 1

np.sum(L_small)   

# Matrix to RDD
# L_small = sc.parallelize(L_small)

924.0

In [185]:
L_full = np.zeros((n_full, n_full))

for i in range(n_full):
  for j in range(n_full):
    key = (str(j), str(i))
    if key in out_edge_exists_full:
      L_full[j, i] = 1

np.sum(L_full)

# Matrix to RDD
# L_full = sc.parallelize(L_full)

8142.0

### Hubbiness and Authority Vector

In [0]:
lambda_factor = 1
mu_factor = 1

##### Small

In [206]:
hubbiness_small = np.zeros(n_small) + 1

for i in range(40):
  # auth
  authority_small = np.dot(L_small.T, hubbiness_small)
  max_auth_small_idx = np.argmax(authority_small)
  authority_small /= authority_small[max_auth_small_idx]
  # hub
  hubbiness_small = np.dot(L_small, authority_small)
  max_hubb_small_idx = np.argmax(hubbiness_small)
  hubbiness_small /= hubbiness_small[max_hubb_small_idx]
                                    
np.sum(authority_small)



22.305924840153427

In [207]:
max_hubb_small_idx = np.argmax(hubbiness_small)
max_hubb_small_idx

59

In [208]:
hubbiness_small[max_hubb_small_idx]

1.0

In [229]:
# highest hubbiness
hubb_small_idx = hubbiness_small.argsort()[-5:][::-1]

for idx in hubb_small_idx:
  print("hubbinessNode=", idx, "hubbinessValue=", hubbiness_small[idx])

hubbinessNode= 59 hubbinessValue= 1.0
hubbinessNode= 39 hubbinessValue= 0.9771360012807863
hubbinessNode= 11 hubbinessValue= 0.9639809836688735
hubbinessNode= 79 hubbinessValue= 0.9519097006973478
hubbinessNode= 38 hubbinessValue= 0.9301632834840544


In [236]:
# lowest hubbiness
hubb_small_idx = hubbiness_small.argsort()[::-1][-5:]

for idx in hubb_small_idx:
  print("hubbinessNode=", idx, "hubbinessValue=", hubbiness_small[idx])

hubbinessNode= 95 hubbinessValue= 0.2335579159381473
hubbinessNode= 15 hubbinessValue= 0.214627402772886
hubbinessNode= 9 hubbinessValue= 0.21398772416868803
hubbinessNode= 35 hubbinessValue= 0.2126461041397039
hubbinessNode= 0 hubbinessValue= 0.0


In [232]:
# highest authority
auth_small_idx = authority_small.argsort()[-5:][::-1]

for idx in auth_small_idx:
  print("authorityNode=", idx, "authorityValue=", authority_small[idx])

authorityNode= 66 authorityValue= 1.0
authorityNode= 40 authorityValue= 0.9416553120606718
authorityNode= 27 authorityValue= 0.9105944749808419
authorityNode= 53 authorityValue= 0.8842590255372046
authorityNode= 1 authorityValue= 0.7838455296960767


In [237]:
# lowest authority
auth_small_idx = authority_small.argsort()[::-1][-5:]

for idx in auth_small_idx:
  print("authorityNode=", idx, "authorityValue=", authority_small[idx])

authorityNode= 24 authorityValue= 0.06332243994003275
authorityNode= 33 authorityValue= 0.0532651655517216
authorityNode= 54 authorityValue= 0.048917626249540776
authorityNode= 71 authorityValue= 0.044910039876904125
authorityNode= 0 authorityValue= 0.0


##### Full

In [210]:
hubbiness_full = np.zeros(n_full) + 1


for i in range(40):
  # auth
  authority_full = np.dot(L_full.T, hubbiness_full)
  max_auth_full_idx = np.argmax(authority_full)
  authority_full /= authority_full[max_auth_full_idx]
  # hub
  hubbiness_full = np.dot(L_full, authority_full)
  max_hubb_full_idx = np.argmax(hubbiness_full)
  hubbiness_full /= hubbiness_full[max_hubb_full_idx]
                                    
np.sum(authority_full)



403.28990881704624

In [211]:
max_hubb_full_idx = np.argmax(hubbiness_full)
max_hubb_full_idx

840

In [212]:
hubbiness_full[max_hubb_full_idx]

1.0

In [226]:
# highest hubbiness
hubb_full_idx = hubbiness_full.argsort()[-5:][::-1]

for idx in hubb_full_idx:
  print("hubbinessNode=", idx, "hubbinessValue=", hubbiness_full[idx])

hubbinessNode= 840 hubbinessValue= 1.0
hubbinessNode= 155 hubbinessValue= 0.9406193908295618
hubbinessNode= 234 hubbinessValue= 0.9008896222258238
hubbinessNode= 389 hubbinessValue= 0.8643225118529235
hubbinessNode= 472 hubbinessValue= 0.8538041364002846


In [239]:
# lowest hubbiness
hubb_full_idx = hubbiness_full.argsort()[::-1][-5:]

for idx in hubb_full_idx:
  print("hubbinessNode=", idx, "hubbinessValue=", hubbiness_full[idx])

hubbinessNode= 539 hubbinessValue= 0.0664127083864826
hubbinessNode= 141 hubbinessValue= 0.06459793639398076
hubbinessNode= 835 hubbinessValue= 0.057533765430976824
hubbinessNode= 23 hubbinessValue= 0.042278926335992326
hubbinessNode= 0 hubbinessValue= 0.0


In [227]:
# highest authority
auth_full_idx = authority_full.argsort()[-5:][::-1]

for idx in auth_full_idx:
  print("authorityNode=", idx, "authorityValue=", authority_full[idx])

authorityNode= 893 authorityValue= 1.0
authorityNode= 16 authorityValue= 0.95555173239123
authorityNode= 799 authorityValue= 0.9461836076135997
authorityNode= 146 authorityValue= 0.9214448520496178
authorityNode= 473 authorityValue= 0.8961985568969516


In [238]:
# lowest authority
auth_full_idx = authority_full.argsort()[::-1][-5:]

for idx in auth_full_idx:
  print("authorityNode=", idx, "authorityValue=", authority_full[idx])

authorityNode= 24 authorityValue= 0.08160344203379763
authorityNode= 462 authorityValue= 0.07527833036615698
authorityNode= 135 authorityValue= 0.06632518878601466
authorityNode= 19 authorityValue= 0.05615124630576268
authorityNode= 0 authorityValue= 0.0
