### Tarea 2: Procesamiento de Datos Masivos
#### Hecho por: Víctor Bórquez

In [2]:
#!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=55c8443ac3a40268eb3197cb882c5831b6a51d44bd1cff77057b49a0bc427d85
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


#### Problema 1) Page Rank en un entorno distribuido

Para poder implementar el algoritmo debemos seguir 4 pasos principales

- Paso 1: Preparar un RDD que tenga cada nodo con su PageRank inicial
- Paso 2: Crear una función que se encargue del intercambio de mensajes entre nodos
- Paso 3: Crear una función que actualice el valor de Page Rank para cada nodo considerando el damping factor
- Paso 4: Iterar

#### Que es el damping factor?

> También conocido como el factor de amortiguación es un número entre 0 y 1 que se utiliza en el algoritmo PageRank y representa la probabilidad de que un usuario haga clic en un enlace de una página web de manera aleatoria.

> Es utilizado en el algoritmo de PageRank para prevenir que las páginas obtengan un PageRank demasiado alto simplemente por tener muchos enlaces


> Según lo que estuve leyendo el valor recomendado para el damping factor es de 85% (http://www.pagerank.dk/Pagerank-formula/Damping-factor.htm)







In [3]:
from pyspark import SparkConf, SparkContext
from random import randint
import pandas as pd
import networkx as nx
import sys

In [4]:
# Creamos la función que se encarga del intercambio de mensajes entre nodos
# Donde cada nodo distribuye su PageRank actual entre sus vecinos
def compute_contributions(nodes_with_rank):
    node, (neighbors, rank) = nodes_with_rank
    num_neighbors = len(neighbors)
    for neighbor in neighbors:
        yield (neighbor, rank / num_neighbors)

def page_rank(sc,nodes,edges, epsilon=1e-3):
    num_nodes = len(nodes)

    # Paso 1: Preparamos un RDD que tenga cada nodo con su PageRank inicial (1/4 en este caso)
    page_rank_inicial = 1/num_nodes
    ranks = sc.parallelize(nodes).map(lambda node: (node, page_rank_inicial))

    # Preparar los datos de las aristas e inicializamos el damping factor
    links = sc.parallelize(edges).distinct().groupByKey().cache()
    damping_factor = 0.85

    # Iterar el cálculo de PageRank
    c = 1
    while True:
        contributions = links.join(ranks).flatMap(compute_contributions)

        # Paso 3: Actualizar el valor de PageRank para cada nodo considerando el damping factor
        new_ranks = contributions.reduceByKey(lambda x, y: x + y).mapValues(lambda rank: rank * damping_factor + (1 - damping_factor) / num_nodes)

        # Comprobar si los cambios son significativos
        diff = new_ranks.join(ranks).map(lambda x: abs(x[1][0] - x[1][1])).reduce(lambda x, y: x + y)
        if diff < epsilon:
            break
        print(f'Iteracion {c} // Diferencia es: {diff}')
        ranks = new_ranks
        c+=1
    return ranks

In [7]:
# Configuramos Spark e inicializamos los de datos según el ejemplo del Enunciado
conf = SparkConf().setAppName("PageRank")
sc = SparkContext.getOrCreate(conf=conf)
nodes = [1, 2, 3, 4]
edges = [(1, 2), (2, 3), (2, 4), (3, 2)]
# Se puede agregar un 3er argumento que corresponde a lo que se considera como cambio no significativo
# Con epsilon igual a 0.001 se demora 3-5 minutos
result = page_rank(sc,nodes,edges)
result.collect()

Iteracion 1 // Diferencia es: 0.42500000000000004
Iteracion 2 // Diferencia es: 0.4834375000000001
Iteracion 3 // Diferencia es: 0.33415625000000004
Iteracion 4 // Diferencia es: 0.17464179687500003
Iteracion 5 // Diferencia es: 0.12071394531250002
Iteracion 6 // Diferencia es: 0.06308934912109376
Iteracion 7 // Diferencia es: 0.04360791274414064
Iteracion 8 // Diferencia es: 0.02279102736999511
Iteracion 9 // Diferencia es: 0.015753358478820798
Iteracion 10 // Diferencia es: 0.008233258637410729
Iteracion 11 // Diferencia es: 0.005690900750474001
Iteracion 12 // Diferencia es: 0.002974264682764627
Iteracion 13 // Diferencia es: 0.0020558378961087337
Iteracion 14 // Diferencia es: 0.0010744531166487215


[(2, 0.10872408739945584), (3, 0.08399376601016231), (4, 0.08399376601016231)]

Ahora veamos el algoritmo con un set de datos más grande, como es cora

In [5]:
df = pd.read_csv('sample_data/cora.cites',sep="\t",
    header=None,
    names=["target", "source"])

In [8]:
G = nx.read_edgelist('sample_data/cora.cites', nodetype=int, delimiter="\t", create_using=nx.DiGraph())
nodes1 = list(G.nodes)
edges1 = list(G.edges)
# Con el epsilon predeterminado se demora 2 minutos
result = page_rank(sc,nodes1,edges1)
result.collect()

Iteracion 1 // Diferencia es: 0.391979413044289
Iteracion 2 // Diferencia es: 0.2402357952313945
Iteracion 3 // Diferencia es: 0.08339565728470656
Iteracion 4 // Diferencia es: 0.029933084077044023
Iteracion 5 // Diferencia es: 0.012437048827688332
Iteracion 6 // Diferencia es: 0.006144770709285498
Iteracion 7 // Diferencia es: 0.003932151647384856
Iteracion 8 // Diferencia es: 0.002619597545578436
Iteracion 9 // Diferencia es: 0.0019349231167914475
Iteracion 10 // Diferencia es: 0.0014206449113701967
Iteracion 11 // Diferencia es: 0.0011253855374427982


[(56112, 6.992266995075556e-05),
 (444240, 0.00011463070843755607),
 (964248, 0.0001396557453426016),
 (1130808, 6.160869570255428e-05),
 (1132968, 7.750611024468599e-05),
 (1116336, 8.829963746860866e-05),
 (1154232, 8.470387334503914e-05),
 (195792, 9.521937819668717e-05),
 (78552, 7.879431605745959e-05),
 (1119216, 9.588849356971586e-05),
 (155736, 0.00012942799355603843),
 (1130856, 6.044879237605377e-05),
 (1107312, 0.0001988261040567206),
 (593328, 0.0007698098434699927),
 (1153056, 6.350354093216617e-05),
 (8832, 5.8306398727170815e-05),
 (124296, 7.132959813786269e-05),
 (7032, 6.150246482920945e-05),
 (5064, 0.00011286301190602136),
 (459216, 6.072687615318928e-05),
 (646440, 0.00020929746278815418),
 (1272, 7.176613706933989e-05),
 (1131312, 8.124473356850249e-05),
 (308232, 7.2359084040325e-05),
 (1114992, 0.0001663344812793448),
 (126912, 0.00016300965920889118),
 (1153728, 6.791737736185554e-05),
 (55968, 7.437785302663987e-05),
 (1131360, 5.954451633777986e-05),
 (1110768

#### Problema 2) Single Source Shortest Path

In [9]:
def compute_costs(node_data):
        node, (links, cost) = node_data
        yield (node, cost) # Esto asegura que cada nodo envíe su costo actual
        for link in links:
            if link: # Ignora la lista vacía
                yield (link[0], cost + link[1])

def single_source_shortest_path(sc,edges_rdd, source_node):

    links = edges_rdd.flatMap(lambda x: [(x[0], (x[1], x[2])), (x[1], [])]).groupByKey().mapValues(list)

    #Establece el costo acumulado inicial para cada nodo
    initial_costs = links.map(lambda x: (x[0], 0 if x[0] == source_node else float('inf')))

    # Itera hasta que los costos no cambien
    c = 0
    while True:
        c += 1
        new_costs = links.join(initial_costs).flatMap(compute_costs).reduceByKey(min)
        num_changes = new_costs.join(initial_costs).filter(lambda x: x[1][0] != x[1][1]).count()
        print(f'Iteración: {c} // Cambios: {num_changes}')
        if num_changes == 0:
            break

        initial_costs = new_costs
    return initial_costs

In [10]:
# Configuración inicial de Spark
conf = SparkConf().setAppName("SingleSourceShortestPath")
sc2 = SparkContext.getOrCreate(conf=conf)

# Inicialización de los datos de las aristas
edges = [(1, 2, 10), (2, 3, 3), (2, 4, 24), (3, 2, 1)]
edges_rdd = sc2.parallelize(edges)
source_node = 1

costos = single_source_shortest_path(sc2,edges_rdd,source_node)
costos.collect()

Iteración: 1 // Cambios: 1
Iteración: 2 // Cambios: 2
Iteración: 3 // Cambios: 0


[(1, 0), (2, 10), (3, 13), (4, 34)]

In [11]:
# Ahora veamos con una mayor cantidad de nodos e aristas
# Donde cada arista es de la misma forma que para el ejemplo anterior
# (nodo_inicial, nodo_final, costo)
nodos_sssp = [i for i in range(25)]
edges_sssp = [(randint(0,25),randint(0,25),randint(1,50)) for i in range(60)]
# Tomamos un nodo inicial cualquiera
source_node2 = edges_sssp[0][0]

In [12]:
edges2_rdd = sc2.parallelize(edges_sssp)
costos2 = single_source_shortest_path(sc2,edges2_rdd,source_node2)
costos2.collect()

Iteración: 1 // Cambios: 3
Iteración: 2 // Cambios: 10
Iteración: 3 // Cambios: 8
Iteración: 4 // Cambios: 3
Iteración: 5 // Cambios: 1
Iteración: 6 // Cambios: 0


[(12, 43),
 (24, 62),
 (0, 107),
 (25, 77),
 (13, inf),
 (1, 32),
 (2, 60),
 (14, 37),
 (15, 94),
 (3, 116),
 (16, 54),
 (4, 46),
 (5, 60),
 (17, 61),
 (18, 58),
 (6, 29),
 (19, 24),
 (7, 50),
 (8, 90),
 (20, 26),
 (9, 0),
 (21, 107),
 (22, 65),
 (10, 93),
 (23, 42),
 (11, 23)]