# COSI 120A - Assignment 3 - PageRank

## Q1. Google PageRank Impletation

In [69]:
import numpy as np

def MYPageRank(M, s = .85, maxerr = .0001):
    n, d = M.shape
    N = np.ones([n,d])* 1/n
    r_old,r_new = np.repeat(1/n,n),np.repeat(1/n,n)
    diff = r_new - r_old
    max_itr = 100
    for i in range(max_itr):
        r_old = r_new.copy()
        A = s * M + (1-s) * N
        r_new = A.dot(r_old)
        diff = r_new - r_old
        if np.sum(np.abs(diff)) <= maxerr:
            break
        i += 1

    return r_new,i

In [73]:
# Input Q1 Transformation Matrix

M = np.array([[0, 0.5, 1, 0],
              [0.333, 0, 0, 0.5],
              [0.333, 0, 0, 0.5],
              [0.333,0.5,0,0]])
import sympy as sy
M[1][0] = sy.Rational('1/3')
M[2][0] = sy.Rational('1/3')
M[3][0] = sy.Rational('1/3')
print(M)

# Print PageRank of M
MYPageRank(M)

[[0.         0.5        1.         0.        ]
 [0.33333333 0.         0.         0.5       ]
 [0.33333333 0.         0.         0.5       ]
 [0.33333333 0.5        0.         0.        ]]


(array([0.32454707, 0.22515098, 0.22515098, 0.22515098]), 9)

Therefore, after iteration for 9 times, we get the rank of ABCD, which is 0.32454707, 0.22515098, 0.22515098, 0.22515098.

## Q2. MapReduce PageRank

### Loads in inputs

In [35]:
# Initialize the spark context.
conf=SparkConf().setAppName("PageRank").setMaster("local[1]")
sc=SparkContext.getOrCreate(conf)

In [64]:
data = sc.textFile('web-BerkStan.txt')
data.take(5)

['1\t2', '1\t5', '1\t7', '1\t8', '1\t9']

In [66]:
def splitting(lines):
    lines = lines.split('\t')
    return (lines[0], lines[1])
data_splitted = data.map(splitting)
data_splitted.take(5)

[('1', '2'), ('1', '5'), ('1', '7'), ('1', '8'), ('1', '9')]

### PageRank

In [74]:
def computeContribs(urls, rank):
    """Calculates URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)


In [77]:
# Loads all URLs from input file and initialize their neighbors.
    
#links = data_splitted.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
links = data_splitted.groupByKey().cache()

# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to 1.0/len(urls) .
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0/685230.0))

In [78]:
ranks.take(5)

[('4', 1.4593640091648059e-06),
 ('12', 1.4593640091648059e-06),
 ('16', 1.4593640091648059e-06),
 ('444366', 1.4593640091648059e-06),
 ('10', 1.4593640091648059e-06)]

In [59]:
# Calculates and updates URL ranks continuously using PageRank algorithm.
for iteration in range(10):
    # Calculates URL contributions to the rank of other URLs.
    contribs = links.join(ranks).flatMap(
        lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))

    # Re-calculates URL ranks based on neighbor contributions.
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)



### Rank Top 100

In [60]:
ranks.top(100, key=lambda items: items[1])

[('272919', 5135.337685312974),
 ('438238', 3491.0960312742745),
 ('571448', 1864.6376899641693),
 ('601656', 1602.690114457544),
 ('319209', 1399.2280593912456),
 ('316792', 1332.0106563322552),
 ('184094', 1282.3453401223012),
 ('401873', 1180.345170594501),
 ('571447', 1148.8928422268498),
 ('284306', 1138.1227323344524),
 ('768', 1082.2358974713154),
 ('927', 1010.7247942388504),
 ('66244', 973.5471932820052),
 ('68949', 972.971066110794),
 ('68948', 965.9204723030336),
 ('95552', 954.2712527121563),
 ('77284', 954.2712527121563),
 ('86237', 954.2712527121563),
 ('95551', 954.2712527121563),
 ('68947', 954.2712527121563),
 ('96070', 954.2712527121563),
 ('86238', 954.2712527121563),
 ('68946', 954.2712527121563),
 ('86239', 954.2712527121563),
 ('66909', 954.2712527121563),
 ('184142', 781.19488314776),
 ('299039', 770.664978342663),
 ('571451', 749.2130806752585),
 ('570985', 744.4415127078548),
 ('299040', 721.5266967317406),
 ('319412', 717.3411614327924),
 ('184332', 699.269138