In [1]:
# !pip install pyspark -q

In [2]:
from pyspark import SparkContext

In [3]:
# Inicializar SparkContext
sc = SparkContext("local", "PageRank")

23/06/24 18:31:01 WARN Utils: Your hostname, santiago-B460MAORUSPRO resolves to a loopback address: 127.0.1.1; using 192.168.31.56 instead (on interface eno1)
23/06/24 18:31:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/24 18:31:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/24 18:31:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/06/24 18:31:02 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [4]:
# Función para preparar el RDD con los Page Ranks iniciales
def initialize_page_ranks(nodes):
    initial_rank = 1.0 / len(nodes)
    return sc.parallelize([(node, initial_rank) for node in nodes])

# Función para preparar el mensaje que cada nodo enviará
def prepare_messages(node, neighbors, rank):
    num_neighbors = len(neighbors)
    if num_neighbors == 0:
        return []
    else:
        rank_per_neighbor = rank / num_neighbors
        return [(neighbor, rank_per_neighbor) for neighbor in neighbors]

# Función para realizar el intercambio de mensajes entre nodos
def exchange_messages(nodes, edges, ranks):
    edges_rdd = sc.parallelize(edges)
    neighbor_ranks = edges_rdd.join(ranks).flatMap(lambda x: prepare_messages(x[0], x[1][0], x[1][1]))
    return neighbor_ranks.reduceByKey(lambda x, y: x + y)

# Función para actualizar el valor de Page Rank considerando el damping factor
def update_page_ranks(ranks, damping_factor, num_nodes):
    return ranks.mapValues(lambda rank: (damping_factor * rank) + ((1 - damping_factor) / num_nodes))

# Función para verificar la convergencia del algoritmo
def check_convergence(prev_ranks, current_ranks, tolerance):
    return prev_ranks.join(current_ranks).map(lambda x: abs(x[1][0] - x[1][1])).max() <= tolerance

In [5]:
# Grafo de ejemplo
nodes = [1, 2, 3, 4]
edges = [(1, [2]), (2, [3, 4]), (3, [2, 4])]

# Parámetros del algoritmo
damping_factor = 0.65
max_iterations = 10
tolerance = 0.01

In [6]:
# Paso 1: Preparar RDD con Page Ranks iniciales
ranks = initialize_page_ranks(nodes)

# Inicializar variables para la verificación de convergencia
prev_ranks = None
current_ranks = ranks

# Iterar el algoritmo hasta que se alcance la convergencia o el número máximo de iteraciones
for iteration in range(max_iterations):
    # Paso 2: Intercambio de mensajes entre nodos
    messages = exchange_messages(nodes, edges, current_ranks)

    # Paso 3: Actualización del valor de Page Rank
    ranks = update_page_ranks(messages, damping_factor, len(nodes))

    # Comprobar convergencia
    if iteration > 0:
        prev_ranks = current_ranks
    current_ranks = ranks
    if iteration > 0 and check_convergence(prev_ranks, current_ranks, tolerance):
        break

# Resultado final: cada nodo junto con su valor de Page Rank
result = ranks.collect()
for node, rank in result:
    print("Node:", node, "Page Rank:", rank)

# Detener SparkContext
sc.stop()

                                                                                

Node: 2 Page Rank: 0.13187903564453124
Node: 3 Page Rank: 0.13006608154296873
Node: 4 Page Rank: 0.1744451171875
