In [2]:
%pip install neo4j
%pip install pandas
%pip install psycopg2-binary

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark.sql import SparkSession
from neo4j import GraphDatabase
import pandas as pd
import psycopg2

conn = psycopg2.connect(
    dbname="db",
    user="postgres",
    password="password",
    host="postgres",
    port="5432"
)

uri = "bolt://neo4j:7687"
username = "neo4j"
password = "badpassword"

# Crear cursor para Postgres
cursor = conn.cursor()

# Crear la conexión con Neo4J
driver = GraphDatabase.driver(uri, auth=(username, password))

# Crear la sesión de Spark
spark = SparkSession.builder.appName("Spark").master("local").getOrCreate()

In [4]:
def execute_cypher_query(query):
    with driver.session() as session:
        return list(session.run(query))

In [5]:
query = """
MATCH (c:Customer)-[t:TRANSACTION]->(p:Product) 
RETURN c.customer_id, p.product_id, t.standard_cost
"""
query_output = execute_cypher_query(query)

In [6]:
# Gasto total por cliente
# GTPC: Gasto total por cliente

'''
SECCIÓN DE SPARK
'''

GTPC_data = []
for record in query_output:
    GTPC_data.append({
        "customer_id": record["c.customer_id"],
        "product_id": record["p.product_id"],
        "standard_cost": record["t.standard_cost"]
        })

GTPC_df = pd.DataFrame(GTPC_data)
GTPC_spark_df = spark.createDataFrame(GTPC_df)

# Consulta de Spark
GTPC_spark_query = GTPC_spark_df.groupBy("customer_id") \
    .sum("standard_cost") \
    .withColumnRenamed("sum(standard_cost)", "total_cost") \
    .orderBy("customer_id")
# Fin consulta de Spark

GTPC_spark_query.show()

GTPC_pandas_df = GTPC_spark_query.toPandas()

'''
SECCIÓN DE POSTGRESQL
'''

# Crear tabla en Postgres
cursor.execute("""
CREATE TABLE IF NOT EXISTS Total_Spent_Per_Customer (
    customer_id INT PRIMARY KEY,
    total_cost FLOAT
)
""")
conn.commit()

# Insertar datos en la tabla
insert_query = "INSERT INTO Total_Spent_Per_Customer (customer_id, total_cost) VALUES (%s, %s)"
data = [tuple(row) for row in GTPC_pandas_df.to_numpy()]
cursor.executemany(insert_query, data)
conn.commit()


+-----------+------------------+
|customer_id|        total_cost|
+-----------+------------------+
|          1|           6066.36|
|          2|           1922.81|
|          3|           6525.42|
|          4|            827.15|
|          5|3508.2599999999998|
|          6|           1985.14|
|          7|            775.27|
|          8| 4957.820000000001|
|          9|           3004.44|
|         10|1645.4699999999998|
|         11|3140.6000000000004|
|         12|2854.1800000000003|
|         13|3397.3599999999997|
|         14|            795.49|
|         15|           3194.95|
|         16|           3241.95|
|         17|           2660.58|
|         18|           5335.24|
|         19|            1286.4|
|         20|           3010.58|
+-----------+------------------+
only showing top 20 rows



In [7]:
# Productos más comprados
# PMC: Productos más comprados

'''
SECCIÓN DE SPARK
'''

PMC_data = []
for record in query_output:
    PMC_data.append({
        "customer_id": record["c.customer_id"],
        "product_id": record["p.product_id"]
        })
    
PMC_df = pd.DataFrame(PMC_data)
PMC_spark_df = spark.createDataFrame(PMC_df)

PMC_spark_query = PMC_spark_df.groupBy("product_id") \
    .count() \
    .withColumnRenamed("count", "total_count") \
    .orderBy("total_count", ascending=False)

PMC_spark_query.show()

PMC_pandas_df = PMC_spark_query.toPandas()

'''
SECCIÓN DE POSTGRESQL
'''

# Crear tabla en Postgres
cursor.execute("""
CREATE TABLE IF NOT EXISTS Product_Purchase_Count (
    product_id INT PRIMARY KEY,
    total_count INT
)
""")
conn.commit()

# Insertar datos en la tabla
insert_query = "INSERT INTO Product_Purchase_Count (product_id, total_count) VALUES (%s, %s)"
data = [tuple(map(int, row)) for row in PMC_pandas_df.to_numpy()]
cursor.executemany(insert_query, data)
conn.commit()

+----------+-----------+
|product_id|total_count|
+----------+-----------+
|         0|       1181|
|         3|        354|
|         1|        311|
|        35|        268|
|        38|        267|
|         4|        241|
|         2|        240|
|        90|        225|
|        12|        224|
|        80|        223|
|         5|        222|
|        43|        216|
|        77|        215|
|        27|        215|
|        69|        215|
|        53|        214|
|        45|        213|
|        92|        211|
|        36|        211|
|        50|        209|
+----------+-----------+
only showing top 20 rows



In [8]:
# Promedio de gasto por cliente
# PGPC: Promedio de gasto por cliente

'''
SECCIÓN DE SPARK
'''

PGPC_data = []
for record in query_output:
    PGPC_data.append({
        "customer_id": record["c.customer_id"],
        "product_id": record["p.product_id"],
        "standard_cost": record["t.standard_cost"]
        })

PGPC_df = pd.DataFrame(PGPC_data)
PGPC_spark_df = spark.createDataFrame(PGPC_df)

PGPC_spark_query = PGPC_spark_df.groupBy("customer_id") \
    .avg("standard_cost") \
    .withColumnRenamed("avg(standard_cost)", "avg_cost") \
    .orderBy("customer_id")

PGPC_spark_query.show()

PGPC_pandas_df = PGPC_spark_query.toPandas()

'''
SECCIÓN DE POSTGRESQL
'''

# Crear tabla en Postgres
cursor.execute("""
CREATE TABLE IF NOT EXISTS Average_Spent_Per_Customer (
    customer_id INT PRIMARY KEY,
    avg_cost FLOAT
)
""")
conn.commit()

# Insertar datos en la tabla
insert_query = "INSERT INTO Average_Spent_Per_Customer (customer_id, avg_cost) VALUES (%s, %s)"
data = [tuple(row) for row in PGPC_pandas_df.to_numpy()]
cursor.executemany(insert_query, data)
conn.commit()


+-----------+------------------+
|customer_id|          avg_cost|
+-----------+------------------+
|          1| 551.4872727272727|
|          2| 640.9366666666666|
|          3|          815.6775|
|          4|           413.575|
|          5| 584.7099999999999|
|          6|           397.028|
|          7|258.42333333333335|
|          8|495.78200000000004|
|          9|            500.74|
|         10|329.09399999999994|
|         11| 523.4333333333334|
|         12|407.74000000000007|
|         13| 485.3371428571428|
|         14|265.16333333333336|
|         15| 532.4916666666667|
|         16|            648.39|
|         17|           532.116|
|         18| 762.1771428571428|
|         19|             428.8|
|         20|           752.645|
+-----------+------------------+
only showing top 20 rows



In [9]:
# Frecuencia de compra por cliente
# FCPC: Frecuencia de compra por cliente

'''
SECCIÓN DE SPARK
'''

FCPC_data = []
for record in query_output:
    FCPC_data.append({
        "customer_id": record["c.customer_id"],
        "product_id": record["p.product_id"]
        })
    
FCPC_df = pd.DataFrame(FCPC_data)
FCPC_spark_df = spark.createDataFrame(FCPC_df)

FCPC_spark_query = FCPC_spark_df.groupBy("customer_id") \
    .count() \
    .withColumnRenamed("count", "total_transactions") \
    .orderBy("customer_id")

FCPC_spark_query.show()

FCPC_pandas_df = FCPC_spark_query.toPandas()

'''
SECCIÓN DE POSTGRESQL
'''

# Crear tabla en Postgres
cursor.execute("""
CREATE TABLE IF NOT EXISTS Transaction_Count_Per_Customer (
    customer_id INT PRIMARY KEY,
    total_transactions INT
)
""")
conn.commit()

# Insertar datos en la tabla
insert_query = "INSERT INTO Transaction_Count_Per_Customer (customer_id, total_transactions) VALUES (%s, %s)"
data = [tuple(map(int, row)) for row in FCPC_pandas_df.to_numpy()]
cursor.executemany(insert_query, data)
conn.commit()

+-----------+------------------+
|customer_id|total_transactions|
+-----------+------------------+
|          1|                11|
|          2|                 3|
|          3|                 8|
|          4|                 2|
|          5|                 6|
|          6|                 5|
|          7|                 3|
|          8|                10|
|          9|                 6|
|         10|                 5|
|         11|                 6|
|         12|                 7|
|         13|                 7|
|         14|                 3|
|         15|                 6|
|         16|                 5|
|         17|                 5|
|         18|                 7|
|         19|                 3|
|         20|                 4|
+-----------+------------------+
only showing top 20 rows

