# Problema 1
Estudiantes:
- Matías Fuentes
- Larry Uribe

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import math
spark = SparkSession.builder \
    .getOrCreate()
sc = spark.sparkContext

In [None]:
nodes = [1, 2, 3, 4]
edges = [(1, 2), (2, 3), (2, 4), (3, 2)]
rdd_nodes = sc.parallelize(nodes)
rdd_edges = sc.parallelize(edges)

## Problema 1

1. Prepara un RDD que tenga cada nodo con su Page Rank inicial. Luego, haz una función que prepare el
mensaje que cada nodo va a enviar. Probablemente quieras almacenar estos valores como otro RDD.

Preparemos un RDD que contenga a cada nodo con su Page Rank inicial. El Page Rank inicial está dado por $\frac{1}{N_{nodos}}$

In [None]:
n_nodes = rdd_nodes.count()
# Disponibilizamos la cantidad de nodos para todos los workers
bc_n_nodes = sc.broadcast(n_nodes)

rdd_ini = rdd_nodes.map(lambda x: (x, 1/bc_n_nodes.value))
rdd_ini.collect()

Para el paso siguiente necesitaremos la cantidad de vecinos

In [None]:
neigh_counts = rdd_edges.countByKey()
# Disponibilizamos la cantidad de mensajes enviados por nodo para todos los workers
bc_neigh_counts = sc.broadcast(neigh_counts) 
bc_neigh_counts.value

El mensaje saliente de cada nodo será el mensaje inicial dividido por la cantidad de vecinos. Vemos que solo el nodo 2 tiene más de 1 vecino, por tanto el único mensaje que será modificado es el suyo.

In [None]:
def prepare_node_msg(node, init_msg):
    if neigh_counts[node] != 0:
        message = init_msg / bc_neigh_counts.value[node]
        return message
    return init_msg

def preprare_rdd_msg(rdd_ini):
    rdd_prep_msg = rdd_ini.map(lambda x: (x[0], prepare_node_msg(x[0], x[1])))
    return rdd_prep_msg

rdd_prep_msg = preprare_rdd_msg(rdd_ini)
rdd_prep_msg.collect()

2. Función que envía los mensajes a los nodos correspondientes y se hace cargo del merge de los mensajes recibidos por cada nodo. Debe retornar un RDD que para cada nodo diga cuál es el mensaje final recibido.

Primero hacemos un join. Las llaves son los nodos emisores, y los valores resultantes serán tuplas donde el primer elemento es el nodo receptor y el segundo elemento es el mensaje recibido.

Luego, generamos otro RDD que contenga solo los valores de la operación anterior y aplicamos reduceByKey para sumar los mensajes, obteniendo así tuplas donde la llave es el nodo receptor y el valor es la suma de todos los mensajes recibidos.

In [None]:
def send_msg(rdd_edges, rdd_prep_msg):
    rdd_msg_sent = rdd_edges.join(rdd_prep_msg)
    rdd_received_msg = rdd_msg_sent.values().reduceByKey(lambda x, y: x + y)
    return rdd_received_msg
rdd_received_msg = send_msg(rdd_edges, rdd_prep_msg)
rdd_received_msg.collect()

3. Haz una función que actualice el valor de Page Rank para cada nodo considerando el damping factor.
Probablemente quieras hacer una función que tome el output del punto anterior y lo procese.

In [None]:
# Disponibilizamos el damping factor para todos los workers
damping = 0.85
bc_damping = sc.broadcast(damping)
def update_pr(rdd_received_msg):
    rdd_updated_pr = rdd_received_msg.mapValues(lambda x: x * bc_damping.value + (1-bc_damping.value)/bc_n_nodes.value)
    return rdd_updated_pr
rdd_updated_pr = update_pr(rdd_received_msg)
rdd_updated_pr.collect()

4. Itera los pasos correspondientes por un número máximo de iteraciones, o hasta que la diferencia entre dos iteraciones del valor de Page Rank sea mínima.

Para facilitar la legibilidad, consolidemos el proceso completo en una función. Usaremos el error absoluto medio como criterio de parada 

In [None]:
def mean_absolute_error(rdd1, rdd2):
    abs_difference = rdd1.join(rdd2).values().map(lambda x: abs(x[0] - x[1]))
    mean_abs_difference = abs_difference.reduce(lambda x, y: x + y)/2
    return mean_abs_difference

def page_rank(rdd_nodes, rdd_edges, damping=0.85, max_iterations = 1000, eps=0.05):
    n_nodes = rdd_nodes.count()
    # Disponibilizamos la cantidad de nodos para todos los workers
    bc_n_nodes = sc.broadcast(n_nodes)
    neigh_counts = rdd_edges.countByKey()
    # Disponibilizamos la cantidad de mensajes enviados por nodo para todos los workers
    bc_neigh_counts = sc.broadcast(neigh_counts) 
    bc_neigh_counts.value
    # Disponibilizamos el damping factor para todos los workers
    bc_damping = sc.broadcast(damping)
    # PageRank inicial
    rdd_ini = rdd_nodes.map(lambda x: (x, 1/bc_n_nodes.value))
    prev_pr = rdd_ini
    # Preparar mensaje inicial
    rdd_prep_msg = preprare_rdd_msg(rdd_ini)
    
    for i in range(max_iterations):
        # Enviar mensaje y gestionar merge al recibir
        rdd_received_msg = send_msg(rdd_edges, rdd_prep_msg)
        # Actualizar PageRank
        rdd_updated_pr = update_pr(rdd_received_msg)
        # Calcular distancia entre el PageRank recién obtenido y el de la iter previa
        mean_distance = mean_absolute_error(prev_pr, rdd_updated_pr)
        # Condición de parada
        if mean_distance < eps:
            break
        # Mensaje enviado para iteración siguiente
        rdd_prep_msg = preprare_rdd_msg(rdd_updated_pr)
        # El PageRank de ahora es el PageRank previo en la próxima iteración
        prev_pr = rdd_updated_pr
        
    print(f'Total iterations: {i+1}')
    return rdd_updated_pr

In [None]:
final_pr = page_rank(rdd_nodes, rdd_edges)
final_pr.collect()

### Prueba con grafo Cora

Cargamos el Grafo

In [None]:
citas = pd.read_csv('cora/cora.cites',sep="\t",
                    header=None,
                    names=["target", "source"])

Extraer nodos y conexiones, y cargarlos a sus respectivos RDD

In [None]:
nodes = list(set(list(citas.target.values) + list(citas.source.values)))
edges = []
for source, target in citas.values:
    edges.append((source, target))
rdd_nodes = sc.parallelize(nodes)
rdd_edges = sc.parallelize(edges)

Correr PageRank

In [None]:
rdd_result = page_rank(rdd_nodes, rdd_edges)