In [None]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

In [None]:
def num_nodes_edges():
    """Returns a tuple (num_nodes, num_edges)"""
    #### Your code for Question 1.1 should go here

    text = sc.textFile('p2p-Gnutella08-adj.txt')
    adjacency_list = text.map(lambda x: x.split())\
                         .map(lambda text_lst: [int(text) for text in text_lst])\
                         .map(lambda lst: (lst[0], lst[1:]))\
                         .flatMapValues(lambda x: x)\
                         .map(lambda s : tuple(sorted(s)))\
                         .distinct()                     

    num_nodes = text.count()                   
    num_edges = adjacency_list.count()                 
    return (num_nodes, num_edges)
    
    
def out_counts():
    """Returns a dictionary where the keys are the outdegrees, and the 
    values are the number of nodes of the corresponding outdegree """
    #### Your code for Question 1.2 should go here
    text = sc.textFile('p2p-Gnutella08-adj.txt')
    out_count_dict = text.map(lambda x: x.split())\
                         .map(lambda text_lst: [int(text) for text in text_lst])\
                         .map(lambda lst: (lst[0], lst[1:]))\
                         .mapValues(len)\
                         .values()\
                         .map(lambda out : (out, 1))\
                         .reduceByKey(lambda x, y: x+y)
                         
    return out_count_dict.collectAsMap() 
    


def in_counts():
    """Returns a dictionary where the keys are the indegrees, and the 
    values are the number of nodes of the corresponding indegree """
    #### Your code for Question 1.3 should go here
    text = sc.textFile('p2p-Gnutella08-adj.txt')
    in_count_dict = text.map(lambda x: x.split())\
                        .map(lambda text_lst: [int(text) for text in text_lst])\
                        .map(lambda lst: (lst[0], lst[1:]))\
                        .flatMapValues(lambda x: x)\
                        .map(lambda pair: pair[::-1])\
                        .groupByKey()\
                        .mapValues(len)\
                        .values()\
                        .map(lambda out : (out, 1))\
                        .reduceByKey(lambda x, y: x+y)

    return in_count_dict.collectAsMap()

In [None]:
import math
from functools import reduce

def personalized_page_rank(source_node_id, num_iterations, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    # your solution to Question 2 here
    text = sc.textFile('p2p-Gnutella08-adj.txt')

    
    adjacency_list = text.map(lambda x: x.split())\
                         .map(lambda text_lst: [int(text) for text in text_lst])\
                         .map(lambda lst: (lst[0], lst[1:]))\
                         .cache()


    # out_importance has ((out_nei, in_neigh), beta*importance)

    out_importance = adjacency_list.filter(lambda x: len(x[1]) != 0)\
                                   .map(lambda x: (x[0], x[1], 1/len(x[1])))\
                                   .flatMap(lambda x : [( (i, x[0]) , (1-jump_factor) * x[2] ) for i in x[1]])\


    # added (1-beta) to the source_node
    # ( in-nei, (out_nei, importance) )
    random_jump = adjacency_list.keys()\
                                .map(lambda x : ((source_node_id, x), jump_factor))\
                                .union(out_importance)\
                                .reduceByKey(lambda x, y: x+y)\
                                .map(lambda x : (x[0][1], (x[0][0], x[1])))\
                                .groupByKey()\
                                .mapValues(list)



    # solve dead ends

    dead_ends = adjacency_list.filter(lambda x : len(x[1]) == 0).keys().collect()

    
    # final_matrix adds prob mass lost by dead ends
    # ( in_nei, (out_nei, importance) )

    final_matrix = random_jump.map(lambda x: (x[0], [( i[0],i[1] + 1 - sum(a[1] for a in x[1]) ) if i[0] == source_node_id else i for i in x[1]])\
                                   if x[0] in dead_ends else x)\
                              .flatMap(lambda x : [ ( x[0], (i[0], i[1]) ) for i in x[1] ])\



    rank_val = adjacency_list.keys()\
                              .map(lambda x : (x, 0) if x != source_node_id else (x, 1))\


    for i in range(num_iterations):
      rank_val = final_matrix.join(rank_val)\
                             .map(lambda x: (x[1][0][0], x[1][0][1]*x[1][1]))\
                             .reduceByKey(lambda x,y: x+y)\




    return rank_val.sortBy(lambda x: x[1], ascending = False).take(10)

In [None]:
def personalized_page_rank_stopping_criterion(source_node_id, jump_factor):
    """Returns a list of the 10 nodes with the highest page rank value along with their value, as tuples
    [(node_id_1, highest_pagerank_value), ..., (node_id_10, 10th_highest_pagerank_value)]"""
    # your solution to Question 3 here
    text = sc.textFile('p2p-Gnutella08-adj.txt')

    
    adjacency_list = text.map(lambda x: x.split())\
                         .map(lambda text_lst: [int(text) for text in text_lst])\
                         .map(lambda lst: (lst[0], lst[1:]))\
                         .cache()


    # out_importance has ((out_nei, in_neigh), beta*importance)

    out_importance = adjacency_list.filter(lambda x: len(x[1]) != 0)\
                                   .map(lambda x: (x[0], x[1], 1/len(x[1])))\
                                   .flatMap(lambda x : [( (i, x[0]) , (1-jump_factor) * x[2] ) for i in x[1]])



    # added (1-beta) to the source_node
    # ( in-nei, (out_nei, importance) )

    random_jump = adjacency_list.keys()\
                                .map(lambda x : ((source_node_id, x), jump_factor))\
                                .union(out_importance)\
                                .reduceByKey(lambda x, y: x+y)\
                                .map(lambda x : (x[0][1], (x[0][0], x[1])))\
                                .groupByKey()\
                                .mapValues(list)\
                                .cache()



    # solve dead ends

    dead_ends = adjacency_list.filter(lambda x : len(x[1]) == 0).keys().collect()

    
    # final_matrix adds prob mass lost by dead ends
    # ( in_nei, (out_nei, importance) )

    final_matrix = random_jump.map(lambda x: (x[0], [( i[0],i[1] + 1 - sum(a[1] for a in x[1]) ) if i[0] == source_node_id else i for i in x[1]])\
                                   if x[0] in dead_ends else x)\
                              .flatMap(lambda x : [ ( x[0], (i[0], i[1]) ) for i in x[1] ])\



    rank_val = adjacency_list.keys()\
                              .map(lambda x : (x, 0) if x != source_node_id else (x, 1))\

    N = adjacency_list.count()
    max_rank_diff = 0.5/N

                            
    while max_rank_diff >= 0.5/N:
      prev_rank_val = rank_val
      rank_val = final_matrix.join(prev_rank_val)\
                             .map(lambda x: (x[1][0][0], x[1][0][1]*x[1][1]))\
                             .reduceByKey(lambda x,y: x+y)\
      
      max_rank_diff = rank_val.union(prev_rank_val)\
                              .reduceByKey(lambda x,y: abs(x-y))\
                              .values()\
                              .max()


    return rank_val.sortBy(lambda x: x[1], ascending = False).take(10)