In [197]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u402-ga-2ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [198]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import re
import tqdm
import psutil

from google.colab import drive

# create spark session
spark = SparkSession.builder.getOrCreate()

# create spark context
sc = spark.sparkContext

# check how much memory is allocated to spark runtime
mem_info = psutil.virtual_memory()
total_memory_gb = mem_info.total / (1024 * 1024 * 1024)
print("Total Memory Allocated to Spark Runtime:", int(total_memory_gb), "GB")

Total Memory Allocated to Spark Runtime: 50 GB


In [199]:
# load data
drive.mount('/content/drive')

full = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/cse 547 - HW 3/graph-full.txt", sep = '\t', inferSchema = True)
small = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/cse 547 - HW 3/graph-small.txt", sep = '\t', inferSchema = True)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [219]:
dataset = full # Choose which data-set
dataset.show()
rdd = dataset.rdd
# rdd.take(20)

+---+---+
|_c0|_c1|
+---+---+
|  1|  2|
|  2|  3|
|  3|  4|
|  4|  5|
|  5|  6|
|  6|  7|
|  7|  8|
|  8|  9|
|  9| 10|
| 10| 11|
| 11| 12|
| 12| 13|
| 13| 14|
| 14| 15|
| 15| 16|
| 16| 17|
| 17| 18|
| 18| 19|
| 19| 20|
| 20| 21|
+---+---+
only showing top 20 rows



In [220]:
# Eliminate duplicate links
M = rdd.map(lambda x : (x[0], x[1]))
print(M.count())
M = M.distinct()
print(M.count())
M.take(20)

8192
8161


[(1, 2),
 (2, 3),
 (3, 4),
 (4, 5),
 (5, 6),
 (6, 7),
 (7, 8),
 (8, 9),
 (9, 10),
 (10, 11),
 (11, 12),
 (12, 13),
 (13, 14),
 (14, 15),
 (15, 16),
 (16, 17),
 (17, 18),
 (18, 19),
 (19, 20),
 (20, 21)]

In [221]:
L = M.map(lambda x : (x[0], x[1], 1))
L.take(20)

[(1, 2, 1),
 (2, 3, 1),
 (3, 4, 1),
 (4, 5, 1),
 (5, 6, 1),
 (6, 7, 1),
 (7, 8, 1),
 (8, 9, 1),
 (9, 10, 1),
 (10, 11, 1),
 (11, 12, 1),
 (12, 13, 1),
 (13, 14, 1),
 (14, 15, 1),
 (15, 16, 1),
 (16, 17, 1),
 (17, 18, 1),
 (18, 19, 1),
 (19, 20, 1),
 (20, 21, 1)]

In [222]:
# Count the out degree of each node
degs = M.map(lambda x : (x[0], 1))
degs = degs.reduceByKey(lambda x, y: x + y)
# print(degs.take(5))
degs = degs.map(lambda x: (x[0], 1 / x[1]))
# print(degs.take(5))
n = degs.count() # no dead-ends so should count all nodes
print(n)

1000


In [223]:
## HITS
# form rows and columns of L

from scipy.sparse import csr_matrix


Lrows = []
Lcols = []


for i in range(n):

    Lcol = L.filter(lambda x : x[1] == i + 1)
    Lrow = L.filter(lambda x : x[0] == i + 1)


    Lcol = Lcol.collect()
    c = np.zeros((n, 1))
    for row, column, value in Lcol:
        c[row - 1, 0] = int(value)

    sparse_c = csr_matrix(c)
    Lcols.append(sparse_c)


    Lrow = Lrow.collect()
    r = np.zeros((1, n))
    for row, column, value in Lrow:
        r[0, column - 1] = int(value)

    sparse_r = csr_matrix(r)
    Lrows.append(sparse_r)


print(len(Lrows))
print(Lrows[0])
print(len(Lcols))
print(Lcols[0])

1000
  (0, 1)	1.0
  (0, 501)	1.0
  (0, 530)	1.0
  (0, 585)	1.0
  (0, 688)	1.0
  (0, 903)	1.0
1000
  (54, 0)	1.0
  (100, 0)	1.0
  (167, 0)	1.0
  (226, 0)	1.0
  (250, 0)	1.0
  (397, 0)	1.0
  (487, 0)	1.0
  (685, 0)	1.0
  (893, 0)	1.0
  (912, 0)	1.0
  (999, 0)	1.0


In [224]:
# Initialize h and a
h = csr_matrix(np.ones([n, 1]))
a = csr_matrix(np.zeros([n, 1]))


# Run HITS iterations
for t in range(40):


    # update a
    for i in range(n):
        a[i] = Lcols[i].T.dot(h)

    max = a.max(axis = 0)
    a = a / max.data


    # update h
    for i in range(n):
        h[i] = Lrows[i].dot(a)

    max = h.max(axis = 0)
    h = h / max.data



h = h.toarray().flatten()
a = a.toarray().flatten()
print("Check:")
print("top hubbiness node id = ", np.argmax(h) + 1, ",  value = ", h[np.argmax(h)])
print("top authority node id = ", np.argmax(a) + 1, ",  value = ", a[np.argmax(a)])

Check:
top hubbiness node id =  840 ,  value =  1.0
top authority node id =  893 ,  value =  1.0


In [225]:
# Function to print the top 5 and bottom 5 values of an array
def t5b5(a):

    print("Top 5")
    for k in range(5):
        top_k = np.argsort(a)[-(k + 1):]
        idx = top_k[0]
        print("Node ID:", idx + 1, "  value:", a[idx])

    print("\nBottom 5")
    for l in reversed(range(5)):
        bottom_l = np.argsort(a)[:(l + 1)]
        idx = bottom_l[l]
        print("Node ID:", idx + 1, "  value:", a[idx])

    return 0



print("------Hubbiness------")
t5b5(h);

print("\n\n------Authority------")
t5b5(a);

------Hubbiness------
Top 5
Node ID: 840   value: 1.0
Node ID: 155   value: 0.9499618624906543
Node ID: 234   value: 0.8986645288972264
Node ID: 389   value: 0.863417110184379
Node ID: 472   value: 0.8632841092495217

Bottom 5
Node ID: 889   value: 0.07678413939216454
Node ID: 539   value: 0.06602659373418492
Node ID: 141   value: 0.06453117646225179
Node ID: 835   value: 0.05779059354433016
Node ID: 23   value: 0.042066854890936534


------Authority------
Top 5
Node ID: 893   value: 1.0
Node ID: 16   value: 0.9635572849634398
Node ID: 799   value: 0.9510158161074016
Node ID: 146   value: 0.9246703586198444
Node ID: 473   value: 0.899866197360405

Bottom 5
Node ID: 910   value: 0.08571673456144878
Node ID: 24   value: 0.08171239406816946
Node ID: 462   value: 0.07544228624641902
Node ID: 135   value: 0.06653910487622794
Node ID: 19   value: 0.05608316377607618


In [226]:
## Page Rank

from scipy.sparse import csr_matrix


Ms = []



for i in range(n):
    Mrow = M.filter(lambda x : x[1] == i + 1)
    MjoinDeg = degs.join(Mrow).map(lambda x: (x[0] - 1, x[1][0]))

    data = MjoinDeg.collect()
    arr = np.zeros((n, 1))
    for row, value in data:
        arr[row, 0] = value

    sparse_matrix = csr_matrix(arr)
    Ms.append(sparse_matrix)



print(len(Ms))

1000


In [227]:
# Initialize r^(0)
r0 = csr_matrix([1 / n] * n)

# Set 1 - teleportation probability
beta = 0.8
teleporting = csr_matrix((1 - beta) / n)



for t in range(40):


    r = csr_matrix(np.zeros([n, 1]))



    for i in range(n):
        r1 = beta * Ms[i].T.dot(r0.T)
        r[i] = teleporting + r1



    r0 = r.T



a = r.toarray().flatten()
print("Check:")
print("top node id = ", np.argmax(a) + 1, ",  value = ", a[np.argmax(a)])

Check:
top node id =  263 ,  value =  0.0020202911815182184


In [228]:
# # Get the indices of the top 5 and bottom 5 values
# top5_indices = np.argsort(a)[-5:]
# bottom5_indices = np.argsort(a)[:5]

# # Get the top 5 and bottom 5 values and their corresponding indices
# top5_values = a[top5_indices]
# bottom5_values = a[bottom5_indices]

# # Print the results
# print("Top 5 Values:")
# for value, index in zip(top5_values, top5_indices):
#     print(f"ID: {index + 1}, Value: {value}")

# print("\nBottom 5 Values:")
# for value, index in zip(bottom5_values, bottom5_indices):
#     print(f"ID: {index + 1}, Value: {value}")

In [229]:
print("------PageRank------")
t5b5(a);

------PageRank------
Top 5
Node ID: 263   value: 0.0020202911815182184
Node ID: 537   value: 0.0019433415714531497
Node ID: 965   value: 0.0019254478071662631
Node ID: 243   value: 0.001852634016241731
Node ID: 285   value: 0.0018273721700645144

Bottom 5
Node ID: 408   value: 0.00038779848719291705
Node ID: 424   value: 0.00035481538649301443
Node ID: 62   value: 0.00035314810510596274
Node ID: 93   value: 0.0003513568937516577
Node ID: 558   value: 0.0003286018525215297
