# Tarea 2 PDM - Vicente Espinoza y Marcelo Vargas - Pregunta 1

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=059b9a91399cf796e9788b873c34a0b181850bd8370643d595069975cc5f8383
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark import SparkContext

# Crear un SparkContext
sc = SparkContext("local", "PageRank")

In [318]:
# Función que para un nodo crea el mensaje a enviar como la división de su PageRank con la cantidad de sus vecinos
def prepare_message(x):
  node, current_rank, vecinos = x[0], x[1][0], x[1][1]
  if type(vecinos) is not tuple:
    if vecinos is None:
      vecinos = (node,)
    else:
      vecinos = (vecinos, )
  message = current_rank/len(vecinos)
  return tuple((vecino, message) for vecino in vecinos)

# Función que ejecuta la creación de los mensajes y los sumas de acuerdo al nodo que los recibió
def send_messages(nodes, node_rank):
  starting_node_sum = nodes.map(lambda x: (x, 0))
  node_w_neighbors = edges.reduceByKey(lambda x, y: (x,) + (y,))
  rank_w_neighbors = node_rank.leftOuterJoin(node_w_neighbors)
  final_node_sum = rank_w_neighbors.flatMap(lambda x: prepare_message(x)).union(starting_node_sum).reduceByKey(lambda x, y: x + y)
  return final_node_sum

# Función que calcula el PageRank final de un nodo en base a los mensajes enviados por sus vecinos y el dumping_factor
def update_rank(x, damping_factor, node_count):
    page_rank = damping_factor * x[1] + ((1-damping_factor)/node_count)
    return page_rank

# Función que verifica si el RDD de Page Ranks converge en base a un epsilon
def check_convergence(old_node_rank, new_node_rank):
  eps = 0.001
  rank_join = old_node_rank.join(new_node_rank)
  rank_dif = rank_join.map(lambda x: abs(x[1][1] - x[1][0])).sum()
  if 0.5 * rank_dif <= eps:
    return True
  else:
    return False

# Función que en base a las funciones anteriores calcula el Page Rank para cada nodo
def actualizar_rank(nodes, edges, damping_factor, iteraciones):
  # Creamos un RDD con los nodos y sus Page Rank inicial
  node_rank = nodes.map(lambda x: (x, 1/node_count))
  # Contamos la cantidad total de nodos
  node_count = nodes.count()
  # Iteramos para actualizar los Page Ranks
  for iteracion in range(iteraciones):
    print(f"Iteracion: {iteracion + 1}")

    # Enviamos los mensajes y creamos un RDD provisorio con los nuevos Page Ranks
    node_messages = send_messages(nodes, node_rank)
    new_node_rank = node_messages.map(lambda x: (x[0], update_rank(x, damping_factor, node_count)))

    # Verificamos la convergencia
    convergence_bool = check_convergence(node_rank, new_node_rank)
    # Si hay convergencia se termina el algoritmo en la iteración actual
    if convergence_bool:
      print(f'Convergencia en la iteración {iteracion + 1}')
      return node_rank
    # Si no hay convergencia, se sigue y se termina el algoritmo hasta completar el número máximo de iteraciones
    else:
      node_rank = new_node_rank
  print(f'No hubo convergencia en {iteracion + 1} iteraciones')
  return node_rank

In [317]:
# Creamos un RDD con los nodos y otro con sus aristas
nodes = sc.parallelize([1, 3, 2, 4])
edges = sc.parallelize([(1, 2), (2, 3), (2, 4), (3, 2)])

# Establecemos el damping factor y el número máximo de iteraciones.
damping_factor, max_iteraciones = 1, 20

# Llamamos la función
actualizar_rank(nodes, edges, damping_factor, max_iteraciones).collect()


Iteracion: 1
Iteracion: 2
Iteracion: 3
Iteracion: 4
Iteracion: 5
Iteracion: 6
Iteracion: 7
Iteracion: 8
Iteracion: 9
Iteracion: 10
Iteracion: 11
Iteracion: 12
Iteracion: 13
Iteracion: 14
Iteracion: 15
Iteracion: 16
Iteracion: 17
Iteracion: 18
Iteracion: 19
Convergencia en la iteración 19


[(1, 0.0), (2, 0.00048828125), (3, 0.0009765625), (4, 0.99853515625)]