# 1. Librerías & Set Up

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=99949d3a84d624c9a485781ebdfde634d004bc64c952b16e2555afc4b1348ce9
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [3]:
from pyspark.sql import SparkSession
from itertools import product

In [4]:
#import neo4j [TO-EDIT]
#from neo4j import GraphDatabase
#import pandas as pd

In [5]:
spark = SparkSession.builder \
    .getOrCreate()

sc = spark.sparkContext
sc # Elemento que ejecuta toda instrucción.

# 2. Neo4j Graph

In [6]:
# TODO

In [7]:
graph = [(1,11,2),(1,11,3),(2,11,3),(3,11,2),(3,11,4),(4,11,1),(4,11,2),(4,11,3),(4,12,5),(5,12,1),(5,12,2),(5,12,6)]

# 3. MapReduce Algorithm Implementation

In [8]:
rdd_graph = sc.parallelize(graph)
rdd_graph.collect()

[(1, 11, 2),
 (1, 11, 3),
 (2, 11, 3),
 (3, 11, 2),
 (3, 11, 4),
 (4, 11, 1),
 (4, 11, 2),
 (4, 11, 3),
 (4, 12, 5),
 (5, 12, 1),
 (5, 12, 2),
 (5, 12, 6)]

In [9]:

def hash(n):
  """
  Returns number mod 2. The ouput will be 0 or 1.
  """
  return n % 2

In [10]:
def get_keys(edge, b_dim, b_set, pattern_dim):
  """
  output: retorna las llaves correspondientes para un vertice.

  Idea general: buscamos el par hash_n1,hash_n2 dentro de las posibles
  combinaciones dentro del espacio de imagenes de la funcion de hash.
  Dentro del for, obtenemos un string con la codificacion de las llaves y luego
  verificamos si es una llave candidata para el vertice entregado:

  1.  sequence_in_reducer: Si la secuencia 'b1b2' esta en la llave del reducer
      codificada como hash(n1)hash(n2) ?, donde ? = 0 o 1, entonces se considerará
      el par reducer_key : edge.
  2.  edge_case: El otro caso, es para cuando tenemos por ejemplo x = n2 y z = n2
      para patrones de 3 vertices.
  """

  hash_n1 = hash(edge[0]) # valor de hash para el nodo 1 = b1
  hash_n2 = hash(edge[2]) # valor de hash para el nodo 2 = b2
  values = [] # posible keys
  sequence = '{}{}'.format(hash_n1, hash_n2)

  for i in range(0, b_dim ** pattern_dim):
    reducer = ''.join(str(num) for num in b_set[i])
    sequence_in_reducer = sequence in reducer
    edge_case = reducer[0] == sequence[1] and reducer[pattern_dim - 1] == sequence[0]
    if sequence_in_reducer or edge_case:
        reducer_key = tuple(int(digit) for digit in reducer)
        values.append((reducer_key, edge))

  return values


def map_phase(rdd, b_dim, b_set, pattern_dim):
  """
  input:
    - rdd: RDD del grafo de dimension 'dim'
    - b_dim: Cantidad de elementos de las imagenes de la funcion de hash.
    - b_set: Imagenes de la funcion de hash.
    - pattern_dim: cantidad de nodos del patron de grafo.
  ouput: Mapeo de cada arista con respecto a las llaves
  """

  mapped_keys = rdd.flatMap(lambda edge: get_keys(edge, b_dim, b_set, pattern_dim))
  reducers = mapped_keys.groupByKey().mapValues(list)
  return reducers

In [110]:
def find_patterns(edges, pattern_dim):
  """
  Funcion que retorna todos los patrones encontrados de la forma
  (n1,...,nl) : [[(n1,label,n2), ..., (nl-1,label,nl)], ... ]
  donde (n1,...,nl) corresponde a los nodos que forman el patron
  y la llave corresponde a los patrones posibles formados por estos nodos
  para una etiqueta específica.
  """
  neighbors = get_neighbors(edges)
  cycles = find_cycles(edges, neighbors, pattern_dim)
  return cycles


def get_neighbors(edges):
  """
  Funcion que retorna todas las aristas consecutivas a otras aristas.
  Es decir, aristas vecinas.
  """
  neighbors = []
  for i in range(len(edges)):
    curr_edge = edges[i]
    neighbors.append([])
    for j in range(len(edges)):
      next_edge = edges[j]
      if next_edge == curr_edge:
        continue
      if curr_edge[2] == next_edge[0]:
        neighbors[i].append(next_edge)
  return neighbors


def find_cycles(edges, neighbors, pattern_dim):
    """
    Funcion que verifica si existe algun grafo ciclico en el grafo de una
    dimension pattern_dim. Retorna todos los ciclos encontrados.
    """
    cycles = [] # Lista de ciclos encontrados

    for edge in edges: # Por cada arista del grafo
        visited = set() # Determinamos las aritas que ya han sido visitadas
        # Inicializamos el stack con con la
        stack = [(edge, [edge])]
        # arista donde empezamos el recorrido

        while len(stack):
            curr_edge, path = stack.pop() # Extraemos la primera arista del stack
            # Recorremos esta arista, por lo que la marcamos como visitada
            visited.add(curr_edge)
            # Caso 1: Si el camino recorrido actual supera la cantidad de nodos del bgp,
            #         descartamos este camino
            if len(path) > pattern_dim:
                continue
            # Caso 2: Si el camino recorrido tiene un largo igual a pattern_dim,
            #         y son aristas transitivas, entonces un posible ciclo.
            first_node = path[0][0] # primer nodo del camino
            last_node = path[-1][2] # ultimo nodo del caminmo
            if len(path) == pattern_dim and first_node == last_node:
                cycles.append(path)
                # Agregamos el camino y seguimos recorriendo
                continue
            # Si no es ciclico el camino desde el nodo actual,
            # entonces iteramos dentro de sus vecinos
            for neighbor_edge in neighbors[edges.index(curr_edge)]:
                if neighbor_edge not in visited:
                    stack.append((neighbor_edge, path + [neighbor_edge]))

    return cycles


def get_nodes(pattern, pattern_dim):
  """
  Funcion que retorna la tupla con los nodos que forman un ciclo.
  """
  nodes = []
  for i in range(pattern_dim):
    edge = pattern[i]
    nodes.append(edge[0])
  return (tuple(nodes), pattern)


def get_unique_lists(pattern1, pattern2):
    """
    Funcion que elimina patrones de grafo duplicados para cada llave
    (n1,n2, ..., nl). Si tenemos l = 3, entonces seria (n1,n2,n3)
    """
    if pattern1 == pattern2:
      return pattern1
    else:
      return [pattern1, pattern2]


def reduce_phase(reducers, pattern_dim):
  """
  input: RDD del grafo y cantidad de nodos del patron de grafo.
  """
  reducers_edges = reducers.map(lambda v: v[1]) \
                            .filter(lambda x: len(x) >= pattern_dim)
  patterns = reducers_edges \
            .map(lambda edges: find_patterns(edges, pattern_dim)) \
            .flatMap(list) \
            .map(lambda pattern: get_nodes(pattern, pattern_dim)) \
            .reduceByKey(get_unique_lists)
  return patterns

In [115]:
def map_reduce(rdd_graph, b_dim, b_set, pattern_dim):
  """
  Funcion que simula el algoritmo MapReduce, en donde se distribuye el grafo
  en diferentes reducers y luego obtenemos los posibles patrones
  de grafos formados combinando las informacion de todos los reducers.
  """
  # Fase de Map: Obtenemos las llaves de cada reducer y el conjunto de aristas mapeados a estas llaves.
  reducers = map_phase(rdd_graph, b_dim, b_set, pattern_dim)
  # Fase Reduce: Obtenemos todos los posibles patrones de L nodos.
  patterns = reduce_phase(reducers, pattern_dim)
  return patterns


# 3. MapReduce Algorithm for Triangles

Para encontrar triangulos dentro de un grafo utilizaremos el algoritmo Map Reduce para parametros b_dim = 2 y L = 3. En este caso, al utilizar la funcion de hash modular, generamos un conjunto de 2 imagenes: {0,1}.

Por lo que los posibles reducers son:
```markdown
REDUCERS = [(0, 0, 0),
            (0, 0, 1),
            (0, 1, 0),
            (0, 1, 1),
            (1, 0, 0),
            (1, 0, 1),
            (1, 1, 0),
            (1, 1, 1)]
```
Las aristas se distribuyen dentro de los reducers, por lo que almacenamos el grafo de manera distribuida.

En cada reducer verificamos si las aristas generan triangulos, en el caso que si (formaria un ciclo) se almacenará en una lista y retornaremos los nodos correspondientes.




In [117]:
# Dimension de elementos del conjunto de imagenes de la funcion de hash: |{0,1}|
b_0 = 2
# Dimension del patron de grafo (triangulo para este caso)
l_0 = 3
# Conjunto de imagenes de la funcion de hash
b_set_0 = list(range(b_0))
# Posibles reducers
reducers_0 = list(product(b_set_0, repeat=l_0))

patterns = map_reduce(rdd_graph, b_0, reducers_0, l_0)
patterns.collect()

[((2, 3, 4), [(2, 11, 3), (3, 11, 4), (4, 11, 2)]),
 ((4, 2, 3), [(4, 11, 2), (2, 11, 3), (3, 11, 4)]),
 ((1, 3, 4), [(1, 11, 3), (3, 11, 4), (4, 11, 1)]),
 ((4, 1, 3), [(4, 11, 1), (1, 11, 3), (3, 11, 4)])]

Notamos que encontramos los nodos:
* (2, 3, 4)
* (4, 2, 3)
* (1, 3, 4)
* (4, 1, 3)

# 4. MapReduce Algorithm for Squares


In [122]:
# Como buscaremos los posibles cuadrados a formar, entonces tendremos l=4 nodos
# en el bgp.
l_1 = 4
b_set_1 = list(range(b_0))
reducers_1 = list(product(b_set_1, repeat=l_1))
squares_patterns = map_reduce(rdd_graph, b_0, reducers_1, l_1)

In [123]:
squares_patterns.collect() # Obtenemos todos los posibles patrones cuadrados sin importar el label.

[((1, 2, 3, 4), [(1, 11, 2), (2, 11, 3), (3, 11, 4), (4, 11, 1)]),
 ((2, 3, 4, 5), [(2, 11, 3), (3, 11, 4), (4, 12, 5), (5, 12, 2)]),
 ((2, 3, 4, 3), [(2, 11, 3), (3, 11, 4), (4, 11, 3), (3, 11, 2)]),
 ((3, 2, 3, 4), [(3, 11, 2), (2, 11, 3), (3, 11, 4), (4, 11, 3)]),
 ((3, 4, 5, 2), [(3, 11, 4), (4, 12, 5), (5, 12, 2), (2, 11, 3)]),
 ((4, 5, 2, 3), [(4, 12, 5), (5, 12, 2), (2, 11, 3), (3, 11, 4)]),
 ((5, 2, 3, 4), [(5, 12, 2), (2, 11, 3), (3, 11, 4), (4, 12, 5)]),
 ((2, 3, 4, 1), [(2, 11, 3), (3, 11, 4), (4, 11, 1), (1, 11, 2)]),
 ((4, 1, 2, 3), [(4, 11, 1), (1, 11, 2), (2, 11, 3), (3, 11, 4)]),
 ((1, 3, 4, 5), [(1, 11, 3), (3, 11, 4), (4, 12, 5), (5, 12, 1)]),
 ((3, 4, 5, 1), [(3, 11, 4), (4, 12, 5), (5, 12, 1), (1, 11, 3)]),
 ((5, 1, 3, 4), [(5, 12, 1), (1, 11, 3), (3, 11, 4), (4, 12, 5)])]

In [150]:
def filter_patterns(pattern, pattern_dim, labels):
  nodes = pattern[0]
  edges = pattern[1]
  satisfy = False
  filtered = []
  for i in range(pattern_dim):
    edge_label = edges[i][1]
    satisfy = True if edge_label == labels[i] else False

  if satisfy:
    return nodes, edges

def bgp_query(rdd_graph, b_dim, reducers, pattern_dim, query):
  patterns = map_reduce(rdd_graph, b_dim, reducers, pattern_dim)
  processed_query = query.replace("(", "").replace(")", "").split(",")
  labels = [int(part.strip()) for part in processed_query if part.strip().isdigit()]
  result = patterns.filter(lambda pattern: filter_patterns(pattern, pattern_dim, labels))

  return result.collect()


In [151]:
bgp_query(rdd_graph, b_0, reducers_1, l_1, "(x,11,y), (y,11,z), (z,11,w), (w,11,x)")

[((1, 2, 3, 4), [(1, 11, 2), (2, 11, 3), (3, 11, 4), (4, 11, 1)]),
 ((2, 3, 4, 3), [(2, 11, 3), (3, 11, 4), (4, 11, 3), (3, 11, 2)]),
 ((3, 2, 3, 4), [(3, 11, 2), (2, 11, 3), (3, 11, 4), (4, 11, 3)]),
 ((3, 4, 5, 2), [(3, 11, 4), (4, 12, 5), (5, 12, 2), (2, 11, 3)]),
 ((4, 5, 2, 3), [(4, 12, 5), (5, 12, 2), (2, 11, 3), (3, 11, 4)]),
 ((2, 3, 4, 1), [(2, 11, 3), (3, 11, 4), (4, 11, 1), (1, 11, 2)]),
 ((4, 1, 2, 3), [(4, 11, 1), (1, 11, 2), (2, 11, 3), (3, 11, 4)]),
 ((3, 4, 5, 1), [(3, 11, 4), (4, 12, 5), (5, 12, 1), (1, 11, 3)])]

In [149]:
bgp_query(rdd_graph, b_0, reducers_1, l_1, "(x,11,y), (y,11,z), (z,12,w), (w,12,x)")

[((2, 3, 4, 5), [(2, 11, 3), (3, 11, 4), (4, 12, 5), (5, 12, 2)]),
 ((5, 2, 3, 4), [(5, 12, 2), (2, 11, 3), (3, 11, 4), (4, 12, 5)]),
 ((1, 3, 4, 5), [(1, 11, 3), (3, 11, 4), (4, 12, 5), (5, 12, 1)]),
 ((5, 1, 3, 4), [(5, 12, 1), (1, 11, 3), (3, 11, 4), (4, 12, 5)])]