# Tarea 2
## IIC2440 - Procesamiento de Datos Masivos

Integrantes:
- Rodrigo Nahum
- Fernando Quintana

## Parte 2: Single Source Shortest Path

## Setup

Primero, instalamos pyspark

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=0b842c6cb2b7d735a84638ba5b1407cdc8ee5e5e806ea392d028e78560d7e364
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


Importamos PySpark y creamos un Spark Context

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .getOrCreate()

sc = spark.sparkContext

# Input

Definimos un grafo de ejemplo a usar, con una cantidad de nodos y una probabilidad de que cada arista exista

In [97]:
from itertools import permutations
import random
import json

n = 10000

edge_chance = 0.1

node_list = [i + 1 for i in range(n)]

edge_list = []
for edge in permutations(node_list, 2):
    if random.random() < edge_chance:
        edge_list.append(edge)

Definimos el damping factor y pasamos a RDD las listas de nodos y aristas

In [107]:
damping_factor = 0.85

nodes = sc.parallelize(node_list)
node_count = nodes.count()
edges = sc.parallelize(edge_list)

## Inicializacion de nodos

Inicializamos los nodos para Page Rank, seteando sus valores iniciales 1 divido en la cantidad de nodos.

Ademas calculamos la cantidad de aristas que tiene cada nodo, de forma de setear que porcentaje del valor del nodo sale por cada arista.

In [69]:
initial_nodes = nodes.map(lambda node: (node, 1/node_count))
edge_count = edges.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (x[0], 1/x[1]))

## Generacion de mensajes

Para crear los mensajes tomamos los nodos y los unimos con las aristas, tomando el valor actual del nodo y multiplicandolo por el 1/(cantidad de aristas del nodo) las cuales se encuentran en edge_count. Esto nos entrega cuanto del valor actual del nodo va a salir a cada arista que sale de ese nodo

In [73]:
messages = initial_nodes.join(edge_count).map(lambda x: (x[0], x[1][0] * x[1][1]))

Luego tomamos estos mensajes y los unimos con las aristas, obteniendo cuanto llega a cada arista.

In [66]:
exchange = messages.join(edges)

[(1, (0.25, 2)), (2, (0.125, 3)), (2, (0.125, 4)), (3, (0.25, 2))]

## Filtrado de Mensajes

En page Rank no filtramos los mensajes

# Agregacion de mensajes 

Despues de tener los valores que llegan a cada arista lo que hacemos es un map, para dejar solo el nodo y el valor que recibe y luego un reduceByKey donde vamos sumando todos los valores que recibe el nodo desde sus aristas

In [79]:
exchange_reduce = exchange.map(lambda x: (x[1][1], x[1][0])).reduceByKey(lambda x, y: x + y)

[(2, 0.5), (3, 0.125), (4, 0.125)]

Ademas calculamos el total page rank, el cual nos servira mas adelante para hacer una normalizacion sobre el page rank total

In [77]:
total_page_rank = exchange_reduce.map(lambda x: x[1]).reduce(lambda x, y : x + y)

In [78]:
exchange_reduce = exchange_reduce.map(lambda x: (x[0], x[1] / total_page_rank))

[(2, 0.6666666666666666), (3, 0.16666666666666666), (4, 0.16666666666666666)]

In [30]:
def compute_final_page_rank(x):
  if(x[1][1] is None):
    return (x[0], (1 - damping_factor)/node_count)
  return (x[0], x[1][1]*damping_factor + (1 - damping_factor)/node_count)

## Update del estado

Para updatear el estado hacemos un leftOuterJoin(ya que un nodo podria no tener aristas entrantes) y calculamos el nuevo page rank aplicando el damping factor con compute_final_page_rank

In [59]:
final_page_rank = initial_nodes.leftOuterJoin(exchange_reduce).map(compute_final_page_rank)
final_page_rank.collect()

[(1, 0.037500000000000006),
 (2, 0.6041666666666666),
 (3, 0.17916666666666667),
 (4, 0.17916666666666667)]

Por ultimo se define una funcion que permite calcular page rank con n iteraciones entregando los nodos como un RDD, los edges con un RDD y el numero de iteraciones.

In [108]:
def compute_final_page_rank(x):
  if(x[1][1] is None):
    return (x[0], (1 - damping_factor)/node_count)
  return (x[0], x[1][1]*damping_factor + (1 - damping_factor)/node_count)

sc.setCheckpointDir("/content/checkpoints")
def computePageRank(nodes, edges, iter):

  node_count = nodes.count()

  initial_nodes = nodes.map(lambda node: (node, 1/node_count)).cache()
  edge_count = edges.map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y).map(lambda x: (x[0], 1/x[1])).cache()
  nodes = initial_nodes

  for i in range(iter):
    print(i)
    messages = nodes.join(edge_count, 8).map(lambda x: (x[0], x[1][0] * x[1][1]))

    exchange = messages.join(edges, 8)

    exchange_reduce = exchange.map(lambda x: (x[1][1], x[1][0])).reduceByKey(lambda x, y: x + y).cache()
    total_page_rank = exchange_reduce.map(lambda x: x[1]).reduce(lambda x, y : x + y)
    exchange_reduce = exchange_reduce.map(lambda x: (x[0], x[1] / total_page_rank))

    nodes = initial_nodes.leftOuterJoin(exchange_reduce, 8).map(compute_final_page_rank).cache()

  return nodes.collect()

page_rank = computePageRank(nodes, edges, 3)

0
1
2
