Tarea 2 Procesamiento de Datos Masivos
Departamento de Ciencia de la Computación

Universidad Catolica de Chile

Profesor: Juan Reutter

Alumnos: Juan Vicuña y Clemente Cambara

# Parte 1: Algoritmo Page Rank

In [None]:
pip install pyspark
from pyspark import SparkContext



## Funciones necesarias

In [None]:
sc = SparkContext("local", "PageRank")

# Función para preparar el RDD inicial con los Page Ranks iniciales
def initialize_page_ranks(nodes):
    num_nodes = nodes.count()
    initial_rank = 1.0 / num_nodes
    return nodes.map(lambda node: (node, initial_rank))

# Función para preparar los mensajes que cada nodo enviará
def prepare_messages(node, neighbors, rank):
    num_neighbors = len(neighbors)
    messages = []
    if num_neighbors > 0:
        rank_per_neighbor = rank / num_neighbors
        for neighbor in neighbors:
            messages.append((neighbor, rank_per_neighbor))
    return messages

# Función para intercambiar mensajes entre nodos y realizar el merge de los mensajes recibidos
def send_messages(node, neighbors, rank):
    return prepare_messages(node, neighbors, rank)

# Función para actualizar el valor de Page Rank para cada nodo considerando el damping factor
def update_page_rank(node, old_rank, damping_factor, messages):
    print("Update")
    new_rank = (1 - damping_factor)//num_nodes + damping_factor * messages
    return (node, new_rank)


ValueError: ignored

## Implementación

In [None]:
def PageRanks(page_ranks, max_iterations, min_diff):
  for iteration in range(max_iterations):
      print(f"Iteración numero: {iteration}")
      # Realizar el paso de envío de mensajes entre nodos
      messages = links.join(page_ranks).flatMap(lambda x: send_messages(x[0], x[1][0], x[1][1]))


      # Realizar el paso de merge de los mensajes recibidos por cada nodo
      merged_messages = messages.reduceByKey(lambda x, y: x + y)

      # Actualizar el valor de PageRank para cada nodo considerando el damping factor
      page_ranks = page_ranks.join(merged_messages).map(lambda x: update_page_rank(x[0], x[1][0], 0.85, x[1][1]))

      # Calcular la diferencia entre iteraciones
      diff = page_ranks.join(merged_messages).map(lambda x: abs(x[1][0] - x[1][1])).sum()

      print(f'Diferencia igual a {diff}')
      print("---------------------------------------------------------------------------")

      # Verificar si la diferencia es menor que el umbral mínimo
      if diff < min_diff:
          break

  return page_ranks.collect()


## Cargamos los datos

In [None]:
# Cargar los datos de nodos y enlaces desde un archivo (suponiendo formato: nodo1,nodo2)
links_data = sc.textFile("links.txt")
# Crear el RDD de enlaces
links = links_data.map(lambda x: tuple(x.split(",")))
# Crear el RDD de nodos
nodes = links.flatMap(lambda x: x).distinct()

num_nodes = nodes.count()

## Hiperparámetros

In [None]:
# Definir el número máximo de iteraciones y la diferencia mínima entre iteraciones
max_iterations = 5
min_diff = 0.0001

## Ejecucción

In [None]:
# Inicializar los Page Ranks
page_ranks = initialize_page_ranks(nodes)

final_ranks = PageRanks(page_ranks, max_iterations, min_diff)
for node, rank in final_ranks:
    print("Node:", node, "Rank:", rank)

# Detener el contexto de Spark
sc.stop()

Iteración numero: 0
Diferencia igual a 0.3750000000000001
---------------------------------------------------------------------------
Iteración numero: 1
Diferencia igual a 0.765
---------------------------------------------------------------------------
Iteración numero: 2
Diferencia igual a 1.5714375000000005
---------------------------------------------------------------------------
Iteración numero: 3
Diferencia igual a 3.2241562499999987
---------------------------------------------------------------------------
Iteración numero: 4
Diferencia igual a 6.61642921875
---------------------------------------------------------------------------
Node: A Rank: 10.981706484375
Node: D Rank: 4.5479794531249995
Node: C Rank: 10.981706484375
Node: B Rank: 10.981706484375


# Parte 2: Algoritmo Single Source Shortest Path

## Funciones necesarias

In [None]:
# Crear el contexto de Spark
sc = SparkContext("local", "SSSP")


# Función para preparar los mensajes que cada nodo va a enviar
def prepare_messages(source_cost, destination, cost):
    return [(destination, source_cost + cost)]

# Función para realizar el merge de los mensajes recibidos por cada nodo
def merge_messages(a, b):
    return min(a, b)

# Función para actualizar los costos acumulados de los nodos
def update_node(node, current_cost, message_cost):
    new_cost = min(current_cost, message_cost)
    return (node, new_cost)

In [None]:
sc.stop()

## Implementación

In [None]:
def SSSP(edges_rdd, node_costs):

  # Iterar el algoritmo Single Source Shortest Path
  for iteration in range(max_iterations):
      print(f"Iteracion número: {iteration}")

      # Realizar el paso de envío de mensajes entre nodos
      edges_costs_join = edges_rdd.map(lambda x: (x[0], (x[1], x[2]))).join(node_costs).map(lambda x: (x[0], (x[1][0][0], x[1][0][1], x[1][1])))
      messages = edges_costs_join.flatMap(lambda x: prepare_messages(x[1][2], x[1][0], x[1][1]))

      # Realizar el paso de merge de los mensajes recibidos por cada nodo
      merged_messages = messages.reduceByKey(merge_messages)

      # Actualizar los costos acumulados de los nodos
      node_costs = node_costs.join(merged_messages).map(lambda x: update_node(x[0], x[1][0], x[1][1]))
      print("---------------------------------------")

  return node_costs.collect()


## Hiperparámetros

In [None]:
max_iterations = 2
initial_node = 1

## Creamos un grafo ejemplo

In [None]:
edges = [(1, 2, 10), (2, 3, 3), (2, 4, 24), (3, 2, 1)]

# Crear el RDD de aristas
edges_rdd = sc.parallelize(edges)

# Obtener todos los nodos
nodes = edges_rdd.flatMap(lambda x: (x[0], x[1])).distinct()

# Inicializar los costos acumulados de los nodos
node_costs = nodes.map(lambda x: (x, float("inf")))
node_costs = node_costs.map(lambda x: (initial_node, 0.0) if x[0] == initial_node else x)


## Ejecucción

In [None]:
final_costs = SSSP(edges_rdd, node_costs)

for node, cost in final_costs:
    print("Node:", node, "Cost:", cost)

sc.stop()

Iteracion número: 0
[(2, (3, 3, inf)), (2, (4, 24, inf)), (1, (2, 10, 0.0)), (3, (2, 1, inf))]
[(3, inf), (4, inf), (2, 10.0), (2, inf)]
Iteracion número: 1
[(2, (3, 3, 10.0)), (2, (4, 24, 10.0)), (3, (2, 1, inf))]
[(3, 13.0), (4, 34.0), (2, inf)]
Node: 2 Cost: 10.0
Node: 3 Cost: 13.0
Node: 4 Cost: 34.0
