## Iniciar Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
sc

24/06/17 23:47:40 WARN Utils: Your hostname, MacBook-Pro-de-Sebastian-1028.local resolves to a loopback address: 127.0.0.1; using 192.168.1.200 instead (on interface en0)
24/06/17 23:47:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/17 23:47:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Instanciar BBD Neo4j

In [2]:
import neo4j
from neo4j import GraphDatabase
import itertools

In [3]:
URI = "neo4j+s://a3d230a8.databases.neo4j.io"
AUTH = ("neo4j", "PDOH3crDmCKi-BEnScXmynkqDkuc2bZGmv5WgHgG6lQ")

driver = GraphDatabase.driver(URI, auth=AUTH)
with driver.session() as session:
    try:
        session.run("RETURN 1")
        print("Connection to Neo4j established successfully!")
    except Exception as e:
        print(f"Failed to connect to Neo4j: {e}")

Connection to Neo4j established successfully!


In [4]:
def create_graph(tx, graph_data):
    # Create nodes
    nodes = set()
    for start_node, relationship_type, end_node in graph_data:
        nodes.add(start_node)
        nodes.add(end_node)

    for node in nodes:
        tx.run("MERGE (n:Node {id: $id})", id=node)

    # Create relationships
    for start_node, relationship_type, end_node in graph_data:
        tx.run(
            """
            MATCH (a:Node {id: $start_id})
            MATCH (b:Node {id: $end_id})
            MERGE (a)-[r:RELATIONSHIP {type: $type}]->(b)
            """,
            start_id=start_node,
            end_id=end_node,
            type=relationship_type,
        )

In [5]:
# Example graph data

graph_data = [
    (1, 11, 2),
    (1, 11, 3),
    (2, 11, 3),
    (3, 11, 2),
    (3, 11, 4),
    (4, 11, 1),
    (4, 11, 2),
    (4, 11, 3),
    (4, 12, 5),
    (5, 12, 1),
    (5, 12, 2),
    (5, 12, 6),
]

In [6]:
with driver.session() as session:
    try:
        session.execute_write(create_graph, graph_data)
        print("Graph instantiated successfully!")
    except Exception as e:
        print(f"Failed to instantiate the graph: {e}")

Graph instantiated successfully!


## Problema 1

Implementa una función que reciba un grafo en Neo4j y genere una RDD con las aristas de ese grafo.


In [7]:
from collections import namedtuple

Edge = namedtuple("Edge", ["n1", "R", "n2"])

In [8]:
query = """
    MATCH (a:Paper)-[r:CITES]->(b:Paper)
    RETURN a.id AS start_node, b.id AS end_node
    """

with driver.session() as session:
    result = session.run(query)
    edges = [
        Edge(record["start_node"], "cites", record["end_node"]) for record in result
    ]

graph_rdd = sc.parallelize(edges)

graph_rdd.collect()[:15]

[Edge(n1=31349, R='cites', n2=31336),
 Edge(n1=686532, R='cites', n2=31336),
 Edge(n1=1129442, R='cites', n2=31336),
 Edge(n1=1107312, R='cites', n2=13195),
 Edge(n1=1120731, R='cites', n2=13195),
 Edge(n1=755217, R='cites', n2=13195),
 Edge(n1=1105116, R='cites', n2=37879),
 Edge(n1=686532, R='cites', n2=31349),
 Edge(n1=137849, R='cites', n2=109323),
 Edge(n1=154134, R='cites', n2=217139),
 Edge(n1=31336, R='cites', n2=31353),
 Edge(n1=31349, R='cites', n2=31353),
 Edge(n1=1152272, R='cites', n2=31353),
 Edge(n1=1124844, R='cites', n2=31353),
 Edge(n1=1135746, R='cites', n2=31353)]

## Problema 2

* Implementa un programa en PySpark que entregue todos los triángulos (como tuplas de tres nodos) en el grafo usando b3 reducers, donde b es un parámetro. Para esta primera parte puedes asumir que tu grafo solo usa una etiqueta de arista (en el grafo de prueba, esa etiqueta corresponde al numero 11).

### Fase de Map

* Generamos pares de (llave, valor) para las aristas del grafo.

In [9]:
# Map procedure
def hash_node(node_id: int, b: int) -> int:
    return hash(node_id) % b


def generate_n_keys(edge, b, n):
    hashed_n1 = hash_node(edge.n1, b)
    hashed_n2 = hash_node(edge.n2, b)

    combinations = itertools.product(range(b), repeat=n - 2)

    return (((hashed_n1, hashed_n2) + comb, edge) for comb in combinations)

In [10]:
def generate_shifts(key):
    n = len(key)
    gen = [tuple(key[i:] + key[:i]) for i in range(n)]
    return gen


def map_to_shifts(key_edge_tuple):
    key, edge_value = key_edge_tuple

    return [((shifted_key), edge_value) for shifted_key in generate_shifts(key)]

### Fase de Reduce

* Agrupamos llaves según si coinciden en términos de un shift de carrusel: (b1, b2, b3) = (b2, b3, b1) = (b3, b1, b2)
* Detectamos ciclos del tamaño deseado para cada reducer mediante DFS y retornamos los nodos asociados

In [11]:
def create_graph(edges):
    graph = {}

    for edge in edges:
        start = edge.n1
        end = edge.n2
        if start not in graph:
            graph[start] = []
        graph[start].append(end)

    return graph


def dfs_iterative(graph, start_node, n):
    stack = [(start_node, 0, [start_node])]
    visited_paths = set()

    while stack:
        node, path_length, path = stack.pop()

        if path_length == n:
            if node == start_node:
                return True, tuple(set(path))
            continue

        if node in graph:
            for neighbor in graph[node]:
                new_path = path + [neighbor]
                new_path_tuple = (node, neighbor)

                # Here we add memory to dont have repeated paths, so 2 -> 3, 3 -> 2, is a cycle of length 2, not 2n
                if new_path_tuple not in visited_paths:
                    visited_paths.add((node, neighbor))
                    stack.append((neighbor, path_length + 1, new_path))
                    visited_paths.add(new_path_tuple)

    return False, ()


def has_cycle_of_length_n(edges, n):
    graph = create_graph(edges)
    cycles = set()
    found = False

    for node in graph:
        cycle_detected, cycle_nodes = dfs_iterative(graph, node, n)

        if cycle_detected:
            found = True
            cycles.add(cycle_nodes)
                       
    if found:
        return True, list(cycles)
    else:
        return False, [()]

In [12]:
# Example usage:
edges = [
    Edge(1, "cites", 2),
    Edge(2, "cites", 3),
    Edge(3, "cites", 4),
    Edge(4, "cites", 1),
    Edge(2, "cites", 4),
    Edge(4, "cites", 5),
]
n = 4  # Length of the cycle we want to detect


cycle_detected, cycle_nodes = has_cycle_of_length_n(edges, n)

if cycle_detected:
    print(f"Cycle of length {n} detected: {cycle_nodes}")
else:
    print(f"No cycle of length {n} found.")

Cycle of length 4 detected: [(1, 2, 3, 4)]


### Pipeline de datos

In [13]:
def pipeline_uni_relation(graph, b, n):

    #1: Map to the keys
    mapped_graph_rdd = graph.flatMap(lambda edge: generate_n_keys(edge, b, n))

    #2: Shift the keys
    aggregated_rdd = mapped_graph_rdd.flatMap(map_to_shifts).groupByKey().mapValues(list)

    #3: Detect cycles
    cycles_rdd = aggregated_rdd.map(
        lambda key_edge_pair: (key_edge_pair[0], has_cycle_of_length_n(key_edge_pair[1], n))
    )

    #4: Filter the cycles
    filtered_rdd = cycles_rdd.filter(lambda x: x[1][0])

    #5: Extract nodes
    nodes_rdd = filtered_rdd.flatMap(lambda x: [(x[0], path) for path in x[1][1]])

    #6: Sort the nodes
    nodes_rdd2 = nodes_rdd.map(lambda x: (x[0], tuple(sorted(list(x[1])))))

    #7: Remove duplicates
    queried_triangles = nodes_rdd2.values().distinct()

    return queried_triangles
    
    

In [14]:
b = 7
n = 3

result = pipeline_uni_relation(graph_rdd, b, n).collect()
len(result), result

                                                                                

(30,
 [(753047, 753070, 753264),
  (3191, 3192, 5086),
  (12195, 12350, 51180),
  (1997, 3233, 49811),
  (9586, 33818, 78557),
  (60159, 60169, 399370),
  (35, 35061, 210871),
  (648106, 648112, 648121),
  (16819, 643221, 643239),
  (23502, 184918, 330148),
  (642894, 643221, 643485),
  (103543, 126920, 126927),
  (12576, 56112, 83725),
  (5064, 5069, 28026),
  (12195, 38722, 51180),
  (6898, 12631, 12638),
  (12350, 51180, 67415),
  (35, 210871, 273152),
  (153063, 561568, 561613),
  (642894, 643221, 643239),
  (33818, 78552, 78557),
  (34263, 34266, 87482),
  (2695, 2698, 342802),
  (643221, 643485, 644577),
  (31932, 194617, 215912),
  (20178, 64271, 91852),
  (119761, 143801, 284025),
  (9586, 33818, 78552),
  (31769, 67245, 67246),
  (126920, 126927, 645897)])

## Problema 3

* Asume ahora que recibes un subgrafo como tres arreglos: un arreglo A con las variables, otro L con los tipos de aristas, y una matriz M de tamaño |A| × |L| × |A| que tiene un uno en la posicion (x, R, y) si y solo si (x, R, y) es una arista de tu subgrafo. 
* Implementa un programa en PySpark que reciba un patrón que tiene solo variables, y exactamente cuatro variables, y entregue todos los matches de ese patrón (como tuplas de 4 nodos) en el grafo usando b4 reducers, donde b es un parámetro.

### Map

Para este caso el Map se hara solamente para los casos las combinaciones donde las relaciones son especificadas en el patrón.

Con esto reducimos el espacio de búsqueda y incorporamos las relaciones en el proceso de búsqueda.

Primero, pensando en que `n1` puede hacer match con `x` y `n2` con `y`, el mapper debe generar llaves `(h(n1), h(n2), 0)`, ..., `(h(n1), h(n2), b - 1)`, y emitir los mensajes que corresponden a esa llave con el valor `(n1, r, n2)`.

Pensando en que `n1` puede hacer match con `y` y `n2` con `z`, el mapper debe generar llaves `(0, h(n1), h(n2))`, ..., `(b - 1, h(n1), h(n2))`, y emitir los mensajes que corresponden a esa llave con el valor `(n1, r, n2)`.

Finalmente, pensando en que `n1` puede hacer match con `z` y `n2` con `x`, el mapper debe generar llaves `(h(n2), 0, h(n1))`, ..., `(h(n2), b - 1, h(n1))`, y emitir los mensajes que corresponden a esa llave con el valor `(n1, r, n2)`.


In [15]:
# Map procedure
def hash_node(node_id: int, b: int) -> int:
    return hash(node_id) % b



def generate_keys(edge, b, rel_order):
    hashed_n1 = hash_node(edge.n1, b)
    hashed_n2 = hash_node(edge.n2, b)
    edge_rel = edge.R

    keys = []
    n = len(rel_order)
    combinations = itertools.product(range(b), repeat=n - 2)

    # Generar llaves según el orden de las relaciones
    for comb in combinations:
        if edge_rel == rel_order[0]:
            keys.append(((hashed_n1, hashed_n2) + comb, edge))

        if edge_rel == rel_order[1]:
            keys.append(((comb[0], hashed_n1, hashed_n2) + comb[1:], edge))

        if n == 3 and edge_rel == rel_order[2]:
            keys.append(((hashed_n2, comb[0], hashed_n1) + comb[1:], edge))

        if n == 4:
            if edge_rel == rel_order[2]:
                keys.append(((comb[0], comb[1], hashed_n1, hashed_n2), edge))
                
            if edge_rel == rel_order[3]:
                keys.append(((hashed_n2, comb[0], comb[1], hashed_n1), edge))

    return keys

### Reduce

#### Adapt DFS

Para este caso, lo que hicimos fue adaptar el anterior algoritmo de DFS, para asi incoporar las relaciones y que ademas de buscar, tambien asegurarnos el grafo sigue cierto orden sequencial.

In [16]:
def create_graph(edges):
    graph = {}

    for edge in edges:
        start = edge.n1
        end = edge.n2
        relation = edge.R
        if start not in graph:
            graph[start] = []
        graph[start].append((end, relation))

    return graph


def dfs__relation_iterative(graph, start_node, n, rel_order):
    stack = [(start_node, 0, [start_node], [])]
    visited_paths = set()

    while stack:
        node, path_length, path, rels = stack.pop()

        if path_length == n:
            if node == start_node and rels == rel_order:
                return True, tuple(set(path))
            continue

        if node in graph:
            for neighbor, relation in graph[node]:
                new_path = path + [neighbor]
                new_rels = rels + [relation]
                new_path_tuple = (node, relation, neighbor)

                # Here we add memory to dont have repeated paths, so 2 -> 3, 3 -> 2, is a cycle of length 2, not 2n
                if new_path_tuple not in visited_paths:
                    visited_paths.add(new_path_tuple)
                    stack.append((neighbor, path_length + 1, new_path, new_rels))
                    
    return False, ()


def has_cycle_of_length_n_with_order(edges, n, rel_order):
    graph = create_graph(edges)
    cycles = set()
    found = False

    for node in graph:
        cycle_detected, cycle_nodes = dfs__relation_iterative(graph, node, n, rel_order)

        if cycle_detected:
            found = True
            cycles.add(cycle_nodes)
                       
    if found:
        return True, list(cycles)
    else:
        return False, [()]

#### Pipeline de Map-Reduce

In [17]:
def pipeline_relation(graph, relations, b):

    n = len(relations)  # Length of the cycle we want to detect
    
    #1: Map to the keys
    mapped_graph_rdd = graph.flatMap(lambda edge: generate_keys(edge, b, relations))

    #2: group by keys
    aggregated_rdd = mapped_graph_rdd.groupByKey().mapValues(list)

    #3: Detect cycles
    cycles_rdd = aggregated_rdd.map(
        lambda key_edge_pair: (
            key_edge_pair[0],
            has_cycle_of_length_n_with_order(key_edge_pair[1], n, relations),
        )
    )

    #4: Filter the cycles
    filtered_rdd = cycles_rdd.filter(lambda x: x[1][0])

    #5: Extract nodes
    nodes_rdd = filtered_rdd.flatMap(lambda x: [(x[0], path) for path in x[1][1]])

    #6: Delete duplicates
    nodes_rdd2 = nodes_rdd.map(lambda x: (tuple(sorted(x[1])), ( x[1])))
    grouped_by_neighbors = nodes_rdd2.groupByKey()
    distinct_nodes = grouped_by_neighbors.map(lambda x: (list(x[1])[0]))

    return distinct_nodes
    

In [33]:
def print_results(results, relations):
    for nodes in results:
            output = ""
            for i in range(len(nodes)):
            # Agrega cada relación a la cadena de salida
                output += f"({nodes[i]} -> {relations[i]} > {nodes[(i + 1) % len(nodes)]}), "

            # Imprime la cadena de salida sin la última coma y espacio
            print(output[:-2])

##### Grafo de papers

In [24]:
b = 10  # Number of buckets
R_1 = "cites"
R_2 = "cites"
R_3 = "cites"
R_4 = "cites"
relations = [R_1, R_2, R_3, R_4]

results = pipeline_relation(graph_rdd, relations, b).collect()
len(results), results

(10,
 [(33818, 78557, 9586, 78511),
  (12195, 51180, 12350, 67415),
  (16819, 643221, 642894, 643239),
  (87482, 34266, 34263, 90655),
  (12210, 51180, 12350, 67415),
  (38722, 51180, 12350, 67415),
  (78552, 9586, 33818, 78557),
  (78552, 9586, 33818, 78511),
  (38722, 12195, 51180, 12350),
  (35, 35061, 141342, 210871)])

#### Graph example

Ejemplo del grafo de clase

In [19]:
data = [
    Edge(1, 11, 2),
    Edge(1, 11, 3),
    Edge(2, 11, 3),
    Edge(3, 11, 2),
    Edge(3, 11, 4),
    Edge(4, 11, 1),
    Edge(4, 11, 2),
    Edge(4, 11, 3),
    Edge(4, 12, 5),
    Edge(5, 12, 1),
    Edge(5, 12, 2),
    Edge(5, 12, 6),
]
edge_rdd = sc.parallelize(data)

edge_rdd.collect()

[Edge(n1=1, R=11, n2=2),
 Edge(n1=1, R=11, n2=3),
 Edge(n1=2, R=11, n2=3),
 Edge(n1=3, R=11, n2=2),
 Edge(n1=3, R=11, n2=4),
 Edge(n1=4, R=11, n2=1),
 Edge(n1=4, R=11, n2=2),
 Edge(n1=4, R=11, n2=3),
 Edge(n1=4, R=12, n2=5),
 Edge(n1=5, R=12, n2=1),
 Edge(n1=5, R=12, n2=2),
 Edge(n1=5, R=12, n2=6)]

In [45]:
b = 10  # Number of buckets
R_1 = 11
R_2 = 11
R_3 = 12
R_4 = 12
relations = [R_1, R_2, R_3, R_4]

result = pipeline_relation(edge_rdd, relations, b).collect()
len(result), result

(2, [(2, 3, 4, 5), (1, 3, 4, 5)])

In [46]:
print_results(result, relations)

(2 -> 11 > 3), (3 -> 11 > 4), (4 -> 12 > 5), (5 -> 12 > 2)
(1 -> 11 > 3), (3 -> 11 > 4), (4 -> 12 > 5), (5 -> 12 > 1)


#### Desafíos/Problemas a resolver

In [21]:
import time

R_1 = "cites"
R_2 = "cites"
R_3 = "cites"
R_4 = "cites"
relations = [R_1, R_2, R_3, R_4]

for b in [2, 3, 5, 10, 20, 30]:
    before = time.time()
    result = pipeline_relation(graph_rdd, relations, b).collect()
    after = time.time()
    print(f"Number of buckets(b): {b:,} - b^4: {b**4:.2e}, Number of cycles: {len(result):,}, Time: {after - before:.2f} seconds")

Number of buckets(b): 2 - b^4: 1.60e+01, Number of cycles: 9, Time: 0.88 seconds
Number of buckets(b): 3 - b^4: 8.10e+01, Number of cycles: 10, Time: 0.48 seconds
Number of buckets(b): 5 - b^4: 6.25e+02, Number of cycles: 8, Time: 0.72 seconds


                                                                                

Number of buckets(b): 10 - b^4: 1.00e+04, Number of cycles: 10, Time: 1.75 seconds


                                                                                

Number of buckets(b): 20 - b^4: 1.60e+05, Number of cycles: 10, Time: 3.86 seconds




Number of buckets(b): 30 - b^4: 8.10e+05, Number of cycles: 10, Time: 13.02 seconds


                                                                                