<a href="https://colab.research.google.com/github/sujikathir/Page-Rank-using-Parallel-processing/blob/main/Page%20Rank%20using%20Map%20Reduce%20and%20Iterative%20model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
#map_reduce.py
# Defines a single function, map_reduce, which takes an input
# dictionary i and applies the user-defined function mapper to each
# (input_key,input_value) pair, producing a list of intermediate 
# keys and intermediate values.  Repeated intermediate keys then 
# have their values grouped into a list, and the user-defined 
# function reducer is applied to the intermediate key and list of 
# intermediate values.  The results are returned as a list.

import itertools

def map_reduce(i,mapper,reducer):
  intermediate = []
  for (key,value) in i.items():
    intermediate.extend(mapper(key,value))
  groups = {}
  for key, group in itertools.groupby(sorted(intermediate), 
                                      lambda x: x[0]):
    groups[key] = list([y for x, y in group])
  return [reducer(intermediate_key,groups[intermediate_key])
          for intermediate_key in groups] 

In [14]:
#pagerank_mr.py
#
# Computes PageRank, using a simple MapReduce library.
#
# MapReduce is used in two separate ways: (1) to compute
# the inner product between the vector of dangling pages
# (i.e., pages with no outbound links) and the current
# estimated PageRank vector; and (2) to actually carry
# out the update of the estimated PageRank vector.
#
# For a web of one million webpages the program consumes
# about one gig of RAM, and takes an hour or so to run,
# on a (slow) laptop with 3 gig of RAM, running Vista and
# Python 2.5.

#import map_reduce
import numpy.random
import random

def paretosample(n,power=2.0):
  # Returns a sample from a truncated Pareto distribution
  # with probability mass function p(l) proportional to
  # 1/l^power.  The distribution is truncated at l = n.

  m = n+1
  while m > n: m = numpy.random.zipf(power)
  return m

def initialize(n,power):
  # Returns a Python dictionary representing a web
  # with n pages, and where each page k is linked to by
  # L_k random other pages.  The L_k are independent and
  # identically distributed random variables with a
  # shifted and truncated Pareto probability mass function
  # p(l) proportional to 1/(l+1)^power.

  # The representation used is a Python dictionary with
  # keys 0 through n-1 representing the different pages.
  # i[j][0] is the estimated PageRank, initially set at 1/n,
  # i[j][1] the number of outlinks, and i[j][2] a list of
  # the outlinks.

  # This dictionary is used to supply (key,value) pairs to
  # both mapper tasks defined below.

  # initialize the dictionary
  i = {} 
  for j in range(n): i[j] = [1.0/n,0,[]]
  
  # For each page, generate inlinks according to the Pareto
  # distribution. Note that this is somewhat tedious, because
  # the Pareto distribution governs inlinks, NOT outlinks,
  # which is what our representation is adapted to represent.
  # A smarter representation would give easy
  # access to both, while remaining memory efficient.
  for k in range(n):
    lk = paretosample(n+1,power)-1
    values = random.sample(range(n),lk)
    for j in values:
      i[j][1] += 1 # increment the outlink count for page j
      i[j][2].append(k) # insert the link from j to k
  return i

def ip_mapper(input_key,input_value):
  # The mapper used to compute the inner product between
  # the vector of dangling pages and the current estimated
  # PageRank.  The input is a key describing a webpage, and
  # the corresponding data, including the estimated pagerank.
  # The mapper returns [(1,pagerank)] if the page is dangling,
  # and otherwise returns nothing.
  
  if input_value[1] == 0: return [(1,input_value[0])]
  else: return []

def ip_reducer(input_key,input_value_list):
  # The reducer used to compute the inner product.  Simply
  # sums the pageranks listed in the input value list, which
  # are all the pageranks for dangling pages.

  return sum(input_value_list)

def pr_mapper(input_key,input_value):
  # The mapper used to update the PageRank estimate.  Takes
  # as input a key for a webpage, and as a value the corresponding
  # data, as described in the function initialize.  It returns a
  # list with all outlinked pages as keys, and corresponding values
  # just the PageRank of the origin page, divided by the total
  # number of outlinks from the origin page.  Also appended to
  # that list is a pair with key the origin page, and value 0.
  # This is done to ensure that every single page ends up with at
  # least one corresponding (intermediate_key,intermediate_value)
  # pair output from a mapper.
  
  return [(input_key,0.0)]+[(outlink,input_value[0]/input_value[1])
          for outlink in input_value[2]]

def pr_reducer_inter(intermediate_key,intermediate_value_list,
                     s,ip,n):
  # This is a helper function used to define the reducer used
  # to update the PageRank estimate.  Note that the helper differs
  # from a standard reducer in having some additional inputs:
  # s (the PageRank parameter), ip (the value of the inner product
  # between the dangling pages vector and the estimated PageRank),
  # and n, the number of pages.  Other than that the code is
  # self-explanatory.
  
  return (intermediate_key,
          s*sum(intermediate_value_list)+s*ip/n+(1.0-s)/n)

def pagerank(i,s=0.85,tolerance=0.00001):
  # Returns the PageRank vector for the web described by i,
  # using parameter s.  The criterion for convergence is that
  # we stop when M^(j+1)P-M^jP has length less than tolerance,
  # in l1 norm.
  
  n = len(i)
  iteration = 1
  change = 2 # initial estimate of error
  while change > tolerance:
    print("Iteration: "+str(iteration))
    # Run the MapReduce job used to compute the inner product
    # between the vector of dangling pages and the estimated
    # PageRank.
    ip_list = map_reduce(i,ip_mapper,ip_reducer)

    # the if-else clause is needed in case there are no dangling
    # pages, in which case MapReduce returns ip_list as the empty
    # list.  Otherwise, set ip equal to the first (and only)
    # member of the list returned by MapReduce.
    if ip_list == []: ip = 0
    else: ip = ip_list[0]

    # Dynamically define the reducer used to update the PageRank
    # vector, using the current values for s, ip, and n.
    pr_reducer = lambda x,y: pr_reducer_inter(x,y,s,ip,n)

    # Run the MapReduce job used to update the PageRank vector.
    new_i = map_reduce(i,pr_mapper,pr_reducer)

    # Compute the new estimate of error.
    change = sum([abs(new_i[j][1]-i[j][0]) for j in range(n)])
    print("Change in l1 norm: "+str(change))

    # Update the estimate PageRank vector.
    for j in range(n): i[j][0] = new_i[j][1]
    iteration += 1
  return i

n = 1000 # works up to about 1000000 pages
i = initialize(n,2.0)
new_i = pagerank(i,0.85,0.0001)

Iteration: 1
Change in l1 norm: 1.1734960761904791
Iteration: 2
Change in l1 norm: 0.5373847834016119
Iteration: 3
Change in l1 norm: 0.27438467564810015
Iteration: 4
Change in l1 norm: 0.1637817352221548
Iteration: 5
Change in l1 norm: 0.0897647588863386
Iteration: 6
Change in l1 norm: 0.05322774757919292
Iteration: 7
Change in l1 norm: 0.03256884058288466
Iteration: 8
Change in l1 norm: 0.019112469155368753
Iteration: 9
Change in l1 norm: 0.010268517960149883
Iteration: 10
Change in l1 norm: 0.005840204250583992
Iteration: 11
Change in l1 norm: 0.0034129330923384346
Iteration: 12
Change in l1 norm: 0.0018961608977573763
Iteration: 13
Change in l1 norm: 0.0010526496331738125
Iteration: 14
Change in l1 norm: 0.0006706209179380719
Iteration: 15
Change in l1 norm: 0.00039262350175472656
Iteration: 16
Change in l1 norm: 0.0002255210265589825
Iteration: 17
Change in l1 norm: 0.00012881402763106447
Iteration: 18
Change in l1 norm: 7.547542472012173e-05


In [15]:
def pagerank(G, alpha=0.85, personalization=None, 
			max_iter=100, tol=1.0e-6, nstart=None, weight='weight', 
			dangling=None): 
	"""Return the PageRank of the nodes in the graph. 

	PageRank computes a ranking of the nodes in the graph G based on 
	the structure of the incoming links. It was originally designed as 
	an algorithm to rank web pages. 

	Parameters 
	---------- 
	G : graph 
	A NetworkX graph. Undirected graphs will be converted to a directed 
	graph with two directed edges for each undirected edge. 

	alpha : float, optional 
	Damping parameter for PageRank, default=0.85. 

	personalization: dict, optional 
	The "personalization vector" consisting of a dictionary with a 
	key for every graph node and nonzero personalization value for each node. 
	By default, a uniform distribution is used. 

	max_iter : integer, optional 
	Maximum number of iterations in power method eigenvalue solver. 

	tol : float, optional 
	Error tolerance used to check convergence in power method solver. 

	nstart : dictionary, optional 
	Starting value of PageRank iteration for each node. 

	weight : key, optional 
	Edge data key to use as weight. If None weights are set to 1. 

	dangling: dict, optional 
	The outedges to be assigned to any "dangling" nodes, i.e., nodes without 
	any outedges. The dict key is the node the outedge points to and the dict 
	value is the weight of that outedge. By default, dangling nodes are given 
	outedges according to the personalization vector (uniform if not 
	specified). This must be selected to result in an irreducible transition 
	matrix (see notes under google_matrix). It may be common to have the 
	dangling dict to be the same as the personalization dict. 

	Returns 
	------- 
	pagerank : dictionary 
	Dictionary of nodes with PageRank as value 

	Notes 
	----- 
	The eigenvector calculation is done by the power iteration method 
	and has no guarantee of convergence. The iteration will stop 
	after max_iter iterations or an error tolerance of 
	number_of_nodes(G)*tol has been reached. 

	The PageRank algorithm was designed for directed graphs but this 
	algorithm does not check if the input graph is directed and will 
	execute on undirected graphs by converting each edge in the 
	directed graph to two edges. 

	
	"""
	if len(G) == 0: 
		return {} 

	if not G.is_directed(): 
		D = G.to_directed() 
	else: 
		D = G 

	# Create a copy in (right) stochastic form 
	W = nx.stochastic_graph(D, weight=weight) 
	N = W.number_of_nodes() 

	# Choose fixed starting vector if not given 
	if nstart is None: 
		x = dict.fromkeys(W, 1.0 / N) 
	else: 
		# Normalized nstart vector 
		s = float(sum(nstart.values())) 
		x = dict((k, v / s) for k, v in nstart.items()) 

	if personalization is None: 

		# Assign uniform personalization vector if not given 
		p = dict.fromkeys(W, 1.0 / N) 
	else: 
		missing = set(G) - set(personalization) 
		if missing: 
			raise NetworkXError('Personalization dictionary '
								'must have a value for every node. '
								'Missing nodes %s' % missing) 
		s = float(sum(personalization.values())) 
		p = dict((k, v / s) for k, v in personalization.items()) 

	if dangling is None: 

		# Use personalization vector if dangling vector not specified 
		dangling_weights = p 
	else: 
		missing = set(G) - set(dangling) 
		if missing: 
			raise NetworkXError('Dangling node dictionary '
								'must have a value for every node. '
								'Missing nodes %s' % missing) 
		s = float(sum(dangling.values())) 
		dangling_weights = dict((k, v/s) for k, v in dangling.items()) 
	dangling_nodes = [n for n in W if W.out_degree(n, weight=weight) == 0.0] 

	# power iteration: make up to max_iter iterations 
	for _ in range(max_iter): 
		xlast = x 
		x = dict.fromkeys(xlast.keys(), 0) 
		danglesum = alpha * sum(xlast[n] for n in dangling_nodes) 
		for n in x: 

			# this matrix multiply looks odd because it is 
			# doing a left multiply x^T=xlast^T*W 
			for nbr in W[n]: 
				x[nbr] += alpha * xlast[n] * W[n][nbr][weight] 
			x[n] += danglesum * dangling_weights[n] + (1.0 - alpha) * p[n] 

		# check convergence, l1 norm 
		err = sum([abs(x[n] - xlast[n]) for n in x]) 
		if err < N*tol: 
			return x 
	raise NetworkXError('pagerank: power iteration failed to converge '
						'in %d iterations.' % max_iter) 


In [16]:
>>> import networkx as nx 
>>> G=nx.barabasi_albert_graph(60,41) 
>>> pr=nx.pagerank(G,0.4) 
>>> pr 


{0: 0.012762116280188423,
 1: 0.012562687224119184,
 2: 0.0129711710223643,
 3: 0.013365134775054436,
 4: 0.013779898628840296,
 5: 0.012971389152932836,
 6: 0.013157950185175158,
 7: 0.012772899264751591,
 8: 0.012380359557936467,
 9: 0.012759639719288408,
 10: 0.012588768676461342,
 11: 0.013384431289396613,
 12: 0.01296107858897791,
 13: 0.013159473196795958,
 14: 0.012769225546855747,
 15: 0.012362901900532306,
 16: 0.012985477629885585,
 17: 0.013565853212868122,
 18: 0.013576473560975456,
 19: 0.013570168680337925,
 20: 0.013168042539239382,
 21: 0.013378300481784863,
 22: 0.012757855624873271,
 23: 0.0127693662085754,
 24: 0.01216664954385023,
 25: 0.01358142778182973,
 26: 0.012375013757911441,
 27: 0.013170979395513673,
 28: 0.01315882900411266,
 29: 0.012970384573172017,
 30: 0.013371774072519878,
 31: 0.01215716949222956,
 32: 0.012978271748507937,
 33: 0.012784361399091915,
 34: 0.013174837466978983,
 35: 0.012575164355548596,
 36: 0.012963473471386579,
 37: 0.0125772496472