In [20]:
!pip install pyspark



In [21]:
from pyspark.sql import SparkSession
from tqdm import tqdm

In [22]:
import random
random.seed(42)

In [23]:
spark = SparkSession.builder \
    .getOrCreate()

sc = spark.sparkContext
sc

In [24]:
#nodes = [1, 2, 3, 4]
N = 100
nodes = [x for x in range(1, N+1)]
n_nodes = len(nodes)
#edges = [(1, 2), (2, 3), (2, 4), (3, 2)]
edges = [(random.randint(1,N), random.randint(1,N)) for _ in range(0, N**2)]
for edge in edges:
  if edge[0] == edge[1]:
    edges.remove(edge)
damp = 0.85
init_value = 1 / len(nodes)
outbound = {}
for node in nodes:
  out = 0
  for edge in edges:
    if edge[0] == node:
      out += 1
  outbound[node] = out
init_prob = [(node, init_value) for node in nodes]


In [25]:
# aca se crea el vector de inicializacion y se distribuyen los nodos y vertices
init_prob_rdd = sc.parallelize(init_prob)
nodes_rdd = sc.parallelize(nodes)
edges_rdd = sc.parallelize(edges)

In [26]:
# esta funcion toma un rdd de pares nodo-probabilidad y calcula cuanto van a entregar a partir de el numero de nodos salientes
def to_send(probs, outbound_count):
  bc = sc.broadcast(outbound_count)
  out_rdd = probs.map(lambda x: (x[0], x[1]/bc.value[x[0]]) if bc.value[x[0]] != 0 else (x[0], 0))
  return out_rdd

In [27]:
msg = to_send(init_prob_rdd, outbound)

In [28]:
# esta funcion toma tres rdds, donde el rdd msgs tiene pares nodo-probabilidad que es lo que van a entregar y retorna un rdd con la probabilidad acumulada que cada nodo recibe
def send_msgs(edges, msgs, nodes):
  recv = nodes.map(lambda x: (x, 0)).leftOuterJoin(msgs.join(edges).map(lambda x: (x[1][1], x[1][0])).reduceByKey(lambda x, y: x + y)).map(lambda x: (x[0], x[1][1]) if x[1][1] != None else (x[0], 0))
  return recv

In [29]:
col_msgs = send_msgs(edges_rdd, msg, nodes_rdd)

In [38]:
# esta funcion aplica el damping factor a las probabilidades generadas por send_msgs
def page_rank_iter(msgs, damp, n_nodes):
  off = (1 - damp)/n_nodes
  damped = msgs.map(lambda x: (x[0], off + damp*x[1]))
  return damped

In [31]:
rank_1 = page_rank_iter(col_msgs, damp, n_nodes)

In [32]:
msg = to_send(rank_1, outbound)

In [33]:
col_msgs = send_msgs(edges_rdd, msg, nodes_rdd)

In [34]:
rank_2 = page_rank_iter(col_msgs, damp, n_nodes)

In [35]:
def page_rank(init_probs, outbound_count, edges, damp, n_nodes, nodes, iters=62):
  msg = to_send(init_probs, outbound_count)
  col_msgs = send_msgs(edges, msg, nodes)
  rank = page_rank_iter(col_msgs, damp, n_nodes)
  for _ in tqdm(range(iters)):
    msg = to_send(rank, outbound_count)
    col_msgs = send_msgs(edges, msg, nodes)
    rank = page_rank_iter(col_msgs, damp, n_nodes)
  return rank

In [37]:
rank = page_rank(init_prob_rdd, outbound, edges_rdd, damp, n_nodes, nodes_rdd)

100%|██████████| 62/62 [00:07<00:00,  7.94it/s]
