In [1]:
import re
import sys
from pyspark import SparkConf, SparkContext
import numpy as np

In [2]:
conf = SparkConf()
sc = SparkContext(conf=conf) 

In [3]:
small_dataset_path = "D:/BigDataMining/Lab3/graph-small.txt"
full_dataset_path = "D:/BigDataMining/Lab3/graph-full.txt"
iterations = 40
beta = 0.8

In [4]:
# get degree of each node
def getDegree(graphRDD):
    degrees = graphRDD.groupByKey().sortByKey().mapValues(set).map(lambda row: row[1]).collect()
    return degrees

# if (i -> j) then 1/deg(i), 0 otherwise
# define Matrix Mji by calculating 1/deg(i) of each row
def MatrixJI(row, degrees):
    result = [0] * len(degrees)
    for i in row:
        result[i-1] = 1 / len(degrees[i-1])
    return np.array(result)
    

# row[0]: source j, row[1]: destination i
# return Matrix M in RDD format
def getMatrixM(degrees, dataRDD):
    matrixM = dataRDD.map(lambda row: (int(row.split("\t")[1]), int(row.split("\t")[0]))).\
    groupByKey().sortByKey().mapValues(set).map(lambda row: MatrixJI(row[1], degrees))
    return matrixM

# matrix multiplication for each row
# row * row ex) [1,2] * transpose([2 * 1]) 
def matrix_multiplication(row, r):
    result = 0
    for i in range(len(row)):
        result += row[i] * r[i]
    return result

In [5]:
def page_rank(dataset_path, iterations, beta):
    # read file
    dataRDD = sc.textFile(dataset_path)
    # trim and separte each line into source node and destination node
    graphRDD = dataRDD.map(lambda row: (int(row.split("\t")[0]), int(row.split("\t")[1])))
    # get degrees
    degrees = getDegree(graphRDD)
    # get matrix M
    matrix_M = getMatrixM(degrees, dataRDD)
    n = len(degrees)
    # n x 1 vector with all entries equal to 1
    n_vector = [1] * n 
    # initialize r
    r = np.array(n_vector) / n
    # iterations
    for i in range(iterations):
        # compute pageRank
        r = (((1-beta)/n) * np.array(n_vector)) + \
        (beta * np.array(matrix_M.map(lambda row: matrix_multiplication(row, r)).collect()))
        print("*********************************")
        print("Iteration {}:".format(i+1))
        print("Top 5:")
        # get top 5 nodes
        top_5 = sorted(range(len(r)), key=lambda k: r[k], reverse=True)[:5]
        for j in top_5:
            print("{}: {}".format(j+1, r[j]))

        print("Bottom 5:")
        # get bottom 5 nodes
        bottom_5 = sorted(range(len(r)), key=lambda k: r[k])[:5]
        for j in bottom_5:
            print("{}: {}".format(j+1, r[j]))
    

In [6]:
page_rank(full_dataset_path, iterations, beta)

*********************************
Iteration 1:
Top 5:
263: 0.0018759018759018757
502: 0.0018663492063492068
126: 0.001843125763125763
285: 0.001819076479076479
146: 0.0018094549894549898
Bottom 5:
558: 0.00032987012987012985
93: 0.0003714285714285714
62: 0.00037272727272727273
424: 0.00037272727272727273
408: 0.00042153846153846153
*********************************
Iteration 2:
Top 5:
263: 0.00208871522584906
965: 0.0019612553136778245
537: 0.0018784860606765369
243: 0.0018250404103778125
285: 0.0018120241867479964
Bottom 5:
558: 0.0003279288446041692
93: 0.00034146361574932996
424: 0.0003505007114098023
62: 0.0003743160173160173
408: 0.0003929167721167721
*********************************
Iteration 3:
Top 5:
263: 0.002023806689320866
537: 0.001930221482836505
965: 0.0019265979884266348
243: 0.0018556711996323286
285: 0.001841159061219935
Bottom 5:
558: 0.0003282667952616296
93: 0.00035036546727648763
424: 0.00035365599001287314
62: 0.0003602146579730995
742: 0.00038778769171214726
***

*********************************
Iteration 26:
Top 5:
263: 0.0020202911815182215
537: 0.0019433415714531525
965: 0.0019254478071662731
243: 0.0018526340162417362
285: 0.0018273721700645127
Bottom 5:
558: 0.0003286018525215321
93: 0.000351356893751659
62: 0.00035314810510596025
424: 0.00035481538649301475
408: 0.00038779848719292117
*********************************
Iteration 27:
Top 5:
263: 0.0020202911815182163
537: 0.0019433415714531469
965: 0.0019254478071662634
243: 0.0018526340162417334
285: 0.001827372170064512
Bottom 5:
558: 0.00032860185252152937
93: 0.0003513568937516578
62: 0.00035314810510596214
424: 0.0003548153864930153
408: 0.0003877984871929175
*********************************
Iteration 28:
Top 5:
263: 0.002020291181518219
537: 0.0019433415714531503
965: 0.0019254478071662627
243: 0.0018526340162417312
285: 0.0018273721700645138
Bottom 5:
558: 0.0003286018525215298
93: 0.0003513568937516577
62: 0.00035314810510596285
424: 0.0003548153864930143
408: 0.000387798487192917

In [85]:
sc.stop()