## CS431/631 Data Intensive Distributed Analytics
### Fall 2023 -- Personalized Page Rank
---

---
#### Overview
For this project, you will be using Python and Spark to perform some graph analysis, using a graph of the Gnutella server network.   In this graph, each node represents a server, and each (directed) edge represents a connection between servers in Gnutella's peer-to-peer network.  The input file for this assignment, `p2p-Gnutella08-adj.txt`, represents the graph as an adjacency list.   Each server (node) is identified by a unique number, and each line in the file gives the adjacency list for a single server.
For example, this line:
> 91	243	1923	2194

gives the adjacency list for server `91`.   It indicates that there are edges from server `91` to servers `243`, `1923`, and `2194`.    According to the Stanford Network Analysis Project, which collected these data, [the graph includes 6301 servers and 20777 edges](http://snap.stanford.edu/data/p2p-Gnutella08.html).

Run the following block to install Spark and download the input file.

In [None]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
!tar xzf spark-3.3.3-bin-hadoop3.tgz
!pip install -q findspark
!wget -q https://student.cs.uwaterloo.ca/~cs451/content/cs431/p2p-Gnutella08-adj.txt

and then create a `SparkContext`:

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

---
#### Question 1  (6/24 marks):

To get warmed up, you should first write Spark code to confirm or determine some basic properties of the Gnutella graph.  Write code in the provided functions that will return answers to the following questions, as specified in each function's documentation:
- How many nodes and edges are there in the graph?  (This should confirm the numbers given above.)
- How many nodes of each outdegree are there? That is, how many nodes have no outgoing edges, how many have one outgoing edge, how many have two outgoing edges, and so on?
- How many nodes of each indegree are there?

You should use Spark to answer these questions.   Do *not* load the entire graph into your Python driver program.

In [None]:
from operator import add

def num_nodes_edges():
    """Returns a tuple (num_nodes, num_edges)"""
    doc = sc.textFile("p2p-Gnutella08-adj.txt")
    num_nodes = doc.count()
    num_edges = doc.map(lambda line: line.split("\t")).\
        map(lambda x: len(x) -1).reduce(add)
    return (num_nodes, num_edges)


def out_counts():
    """Returns a dictionary where the keys are the outdegrees, and the
    values are the number of nodes of the corresponding outdegree """
    #### Your code for Question 1.2 should go here
    doc = sc.textFile("p2p-Gnutella08-adj.txt").map(lambda line: line.split("\t"))
    out_deg = doc.map(lambda x: ((len(x) -1),1)).reduceByKey(lambda x,y: x+y)
    return out_deg.collectAsMap()



def in_counts():
    """Returns a dictionary where the keys are the indegrees, and the
    values are the number of nodes of the corresponding indegree """
    doc = sc.textFile("p2p-Gnutella08-adj.txt").flatMap(lambda line: line.split("\t"))
    # calculate the number of times each node shows up and reduce
    in_deg = doc.map(lambda x: [x,1]).reduceByKey(lambda x,y: x+y).\
        map(lambda x: (x[1]-1,1)).reduceByKey(lambda x,y: x+y)
    return in_deg.collectAsMap()




---
Your main objective for this assignment is to perform *single source personalized page rank* over the Gnutella graph.  To get started, you should make sure that you have a clear understanding of ordinary (i.e., non-personalized) page rank.  Read the description of page rank in Section 5.3 of [the course textbook](https://lintool.github.io/MapReduceAlgorithms/index.html).   Personalized page rank is like ordinary page rank except:
- One node in the graph is designated as the *source* node. Personalized page rank is performed with respect to that source node.
- Personalized page rank is initialized by assigning all probability mass to the source node, and none to the other nodes. In contrast, ordinary page rank is initialized by giving all nodes the same probability mass.
- Whenever personalized page rank makes a random jump, it jumps back to the source node. In contrast, ordinary page rank may jump to any node.
- In personalized page rank, all probability mass lost dangling nodes is put back into the source nodes.  In ordinary page rank, lost mass is distributed evenly over all nodes.

#### Question 2  (10/24 marks):

Your task is to write a Spark program to perform personalized page rank over the Gnutella graph for a specified number of iterations, and of course a specific node. The function you will implement takes three input values:
- source node id (a non-negative integer)
- iteration count (a positive integer)
- random jump factor value (a float between 0 and 1) - This is 1-B as introduced in the lecture.

The function should perform personalized page rank, with respect to the specified source node, over the Gnutella graph, for the specified number of iterations, using Spark.
The output of your function should be a list of the 10 nodes with the highest personalized page rank with respect to the given source. For each of the 10 nodes, return the node's id and page rank value as a tuple. The list returned by the function should therefore look something like this: `[(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]`

In [None]:
# helper function to compute the contribution * (1-beta) to its outneighbors
#   or 0 for itself
def contri(out_nbs, rank, beta):
    for i in range(len(out_nbs)):
      if i == 0:
        yield [out_nbs[i],0]
      else:
        num_nbs = len(out_nbs)-1
        yield [out_nbs[i], (1-beta)*rank/num_nbs]


def personalized_page_rank(source_node_id, num_iterations, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    # your solution to Question 2 here
    doc = sc.textFile("p2p-Gnutella08-adj.txt").map(lambda line: line.split("\t"))
    # each list is in the form like
    #     (node, [node, out-neighbor_1, ..., out-neighbor_k])
    listdoc = doc.map(lambda x: (x[0], list(x)))
    # initialize source node with mass 1 and other node with mass 0
    ranklis = listdoc.map(lambda x:(x[0],1) if x[0] == str(source_node_id) else (x[0],0))
    for iteration in range(num_iterations):
      # (node, [list of out-neighbors, rank of node])
      comb = listdoc.join(ranklis)
      # distribute the mass
      degs = comb.map(lambda x:x[1]).flatMap(lambda x: contri(x[0],x[1],jump_factor))
      # mass of source node
      loss = 1 - degs.map(lambda x: x[1]).sum()
      # update rank
      ranklis = degs.reduceByKey(lambda x,y:x+y).\
          map(lambda x: (x[0], loss) if x[0] == str(source_node_id) else x)
    # sort by rank and take top 10
    return ranklis.sortBy(lambda x: x[1], ascending = False).\
          map(lambda x: (int(x[0]), x[1])).take(10)



[(0, 0.3405274579137627),
 (9, 0.03250507933815258),
 (5, 0.03247639940230466),
 (7, 0.032362487667159025),
 (4, 0.0322172720112759),
 (3, 0.031500033458619965),
 (8, 0.031488430803692694),
 (10, 0.030872138035464868),
 (2, 0.030652673309598692),
 (6, 0.03065179010587444)]

---
#### Question 3  (4/24 marks):

For the previous question, you implemented personalized page rank that ran for a specified number of iterations.  However, it is also common to write iterative algorithms that run until some specified termination condition is reached.
For example, for page rank, suppose the $p_i(x)$ represents the probability mass assigned to node $x$ after the $i$th iteration of the algorithm.  ($p_0(x)$ is the initial probability mass of node $x$.)   We define the change of $x$'s probability mass on the $i$th iteration as $\lvert p_i(x)-p_{i-1}(x) \rvert$.   Then, we can iterate personalized page rank until the maximum (over all nodes) change is less than a specified threshold, i.e, until all nodes' page ranks have converged.

For this question, modify your personalized page rank implementation from Question 2 so that it iterates until the
maximum node change is less than $\frac{0.5}{N}$, where $N$ represents the number of nodes in the graph.
This version of the function should take only two inputs: the source node id and the random jump factor.

In [None]:
def personalized_page_rank_stopping_criterion(source_node_id, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    doc = sc.textFile("p2p-Gnutella08-adj.txt")
    N = doc.count()
    doc = doc.map(lambda line: line.split("\t"))
    # each list is in the form like
    #     (node, [node, out-neighbor_1, ..., out-neighbor_k])
    listdoc = doc.map(lambda x: (x[0], list(x)))
    listdoc = listdoc.repartition(listdoc.getNumPartitions()).cache()
    # initialize source node with mass 1 and other node with mass 0
    ranklis = listdoc.map(lambda x:(x[0],1) if x[0] == str(source_node_id) else (x[0],0))
    change = 1
    while change >= 0.5/N:
      ranklis_prev = ranklis
      # (node, [list of out-neighbors, rank of node])
      comb = listdoc.join(ranklis)
      # distribute the mass to out-neighbors
      degs = comb.map(lambda x:x[1]).flatMap(lambda x: contri(x[0],x[1],jump_factor))
      # mass of source node
      loss = 1 - degs.map(lambda x: x[1]).sum()
      # update rank
      ranklis = degs.reduceByKey(lambda x,y:x+y).\
          map(lambda x: (x[0], loss) if x[0] == str(source_node_id) else x)
      # calculate change (node, [prev_rank, rank_now])
      change = ranklis.join(ranklis_prev).map(lambda x: abs(x[1][0]-x[1][1])).max()
    _ = listdoc.unpersist()
    # sort by rank and take top 10
    return ranklis.sortBy(lambda x: x[1], ascending = False).\
        map(lambda x: (int(x[0]), x[1])).take(10)



[(0, 0.3405754456208271),
 (9, 0.032488188457622524),
 (5, 0.032462926597637755),
 (7, 0.032348695688724526),
 (4, 0.03220195432732894),
 (3, 0.03148505217375426),
 (8, 0.03147346845304872),
 (10, 0.030858384977842938),
 (2, 0.03063851843475855),
 (1, 0.030637624129176883)]