In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [6]:
from pyspark import SparkContext

# Crear el contexto de Spark
sc = SparkContext("local", "PageRank")

# Paso 1: Preparar RDD con el Page Rank inicial
nodes = [1, 2, 3, 4, 5, 6]
edges = [(1, 2), (2, 3), (2, 4), (3, 2), (4, 5), (4, 6), (5, 1), (6, 3)]

# Valor de damping factor
damping_factor = 0.85

# Crear RDD con los nodos y sus Page Ranks iniciales
initial_ranks = sc.parallelize([(node, 1/len(nodes)) for node in nodes])

# Convertir la lista de aristas en un RDD
edges_rdd = sc.parallelize(edges)


# Paso 2: Función para el intercambio de mensajes entre nodos
def send_messages(vecinos):
    num_neighbors = len(vecinos)
    messages = []
    if num_neighbors > 0:
        for rank, neighbor in vecinos:
            rank_per_neighbor = rank / num_neighbors
            messages.append((neighbor, rank_per_neighbor))
    return messages

# Función para realizar el merge de los mensajes recibidos por cada nodo
def merge_messages(rank1, rank2):
    return rank1 + rank2

# Paso 3: Función para actualizar el valor de Page Rank para cada nodo
def update_rank(node, message):
    rank = (1 - damping_factor) / len(nodes)
    print(rank)
    print(node, rank + damping_factor * message)
    return (node, rank + damping_factor * message)

In [7]:
# Paso 4: Iterar los pasos anteriores por un número máximo de iteraciones o hasta convergencia
max_iterations = 2
convergence_threshold = 0.001

ranks = initial_ranks # [(1, 0.25), (2, 0.25), (3, 0.25), (4, 0.25)]

for iteration in range(max_iterations):
    # Paso 2: Enviar mensajes y hacer el merge
    messages = ranks.join(edges_rdd).groupByKey().flatMap(lambda x: (send_messages(x[1]))) # [(3, 0.125), (4, 0.125), (2, 0.25), (2, 0.25)]
    received_messages = messages.reduceByKey(merge_messages) # [(4, 0.125), (2, 0.5), (3, 0.125)]
    # Paso 3: Actualizar el valor de Page Rank
    fixed_ranks = ranks.leftOuterJoin(received_messages).map(lambda x: (x[0], x[1][0] if x[1][0] is not None else 0, x[1][1] if x[1][1] is not None else 0)) # [(3, 0.25, 0.125), (1, 0.25, 0), (4, 0.25, 0.125), (2, 0.25, 0.5)]
    new_ranks = fixed_ranks.map(lambda x: update_rank(x[0], x[2])) # [(3, 0.14375), (1, 0.037500000000000006), (4, 0.14375), (2, 0.4625)]
    # Verificar convergencia
    rank_diff = new_ranks.join(ranks).map(lambda x: abs(x[1][0] - x[1][1])).reduce(lambda x, y: x + y)
    if rank_diff < convergence_threshold:
        break

    ranks = new_ranks

# Imprimir los resultados
result = ranks.collect()
for node, rank in result:
    print(f"Node {node}: Page Rank = {rank}")

# Detener el contexto de Spark
sc.stop()


Node 1: Page Rank = 0.10645833333333335
Node 2: Page Rank = 0.3685416666666667
Node 3: Page Rank = 0.2375
Node 4: Page Rank = 0.15604166666666666
Node 5: Page Rank = 0.06572916666666667
Node 6: Page Rank = 0.06572916666666667
