In [1]:
# ===== VERSIÓN LOCAL PARA macOS =====
# (No necesitas sudo, apt, wget, etc. en Mac)

# Solo instalar PySpark (ya lo tienes)
# !pip install pyspark

import os
import sys

# Configuración para macOS (en lugar de las rutas de Linux)
os.environ["JAVA_HOME"] = "/usr"  # Tu Java está aquí
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

print("✅ Variables configuradas para macOS")

# No necesitas descargar Spark manualmente, PySpark ya lo incluye
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

# Misma configuración que Colab pero adaptada
spark = SparkSession \
    .builder \
    .appName("Ejercicio de Individual") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("🔥 ¡Spark funcionando localmente!")
spark

✅ Variables configuradas para macOS


KeyboardInterrupt: 

In [6]:
from pyspark.sql import Row

# Clientes: ClienteID, Nombre, Edad, Ciudad
clientes_data = [
    Row(ClienteID=1, Nombre="Ana",    Edad=23, Ciudad="CDMX"),
    Row(ClienteID=2, Nombre="Luis",   Edad=29, Ciudad="Guadalajara"),
    Row(ClienteID=3, Nombre="Sofía",  Edad=21, Ciudad="Monterrey"),
    Row(ClienteID=4, Nombre="Diego",  Edad=34, Ciudad="CDMX"),
    Row(ClienteID=5, Nombre="María",  Edad=27, Ciudad="Puebla")
]
df_clientes = spark.createDataFrame(clientes_data)
df_clientes.show()

# Productos: ProductoID, Producto, Categoría, Precio
productos_data = [
    Row(ProductoID=10, Producto="Teclado",    Categoria="Accesorios", Precio=350.0),
    Row(ProductoID=11, Producto="Mouse",      Categoria="Accesorios", Precio=250.0),
    Row(ProductoID=12, Producto="Monitor",    Categoria="Pantallas",  Precio=4200.0),
    Row(ProductoID=13, Producto="Laptop",     Categoria="Cómputo",    Precio=18500.0),
    Row(ProductoID=14, Producto="Audífonos",  Categoria="Audio",      Precio=900.0)
]
df_productos = spark.createDataFrame(productos_data)
df_productos.show()

# Compras: CompraID, ClienteID, ProductoID, Cantidad
compras_data = [
    Row(CompraID=100, ClienteID=1, ProductoID=11, Cantidad=2),
    Row(CompraID=101, ClienteID=2, ProductoID=12, Cantidad=1),
    Row(CompraID=102, ClienteID=2, ProductoID=10, Cantidad=1),
    Row(CompraID=103, ClienteID=3, ProductoID=14, Cantidad=3),
    Row(CompraID=104, ClienteID=4, ProductoID=13, Cantidad=1),
    Row(CompraID=105, ClienteID=5, ProductoID=11, Cantidad=1),
    Row(CompraID=106, ClienteID=5, ProductoID=12, Cantidad=2)
]
df_compras = spark.createDataFrame(compras_data)
df_compras.show()

NameError: name 'spark' is not defined

In [None]:
# CONFIGURACIÓN SIMPLE CON PYSPARK
import os

# Configurar Java (ya tienes Java 24 instalado)
os.environ['JAVA_HOME'] = "/usr"

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("BigData") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("🔥 Spark funcionando!")
spark

In [44]:
# =========================
# JOIN de las 3 tablas
# =========================
# Queremos ver: ClienteID, Ciudad, Producto, Categoría, Cantidad y el Importe total de la compra.
from pyspark.sql.functions import col
df_join = df_compras.join(df_productos, on="ProductoID", how="inner")\
              .join(df_clientes, on="ClienteID", how="inner")\
              .select(
                 col("ClienteID"),
                 col("Ciudad"),
                 col("Producto"),
                 col("Categoria"),
                 col("Cantidad"),
                 (col("Cantidad") * col("Precio")).alias("Importe Total"),
                 col("Edad"),
                 col("Precio")
              )


df_join.printSchema()
df_join.show()

root
 |-- ClienteID: long (nullable = true)
 |-- Ciudad: string (nullable = true)
 |-- Producto: string (nullable = true)
 |-- Categoria: string (nullable = true)
 |-- Cantidad: long (nullable = true)
 |-- Importe Total: double (nullable = true)
 |-- Edad: long (nullable = true)
 |-- Precio: double (nullable = true)

+---------+-----------+---------+----------+--------+-------------+----+-------+
|ClienteID|     Ciudad| Producto| Categoria|Cantidad|Importe Total|Edad| Precio|
+---------+-----------+---------+----------+--------+-------------+----+-------+
|        1|       CDMX|    Mouse|Accesorios|       2|        500.0|  23|  250.0|
|        2|Guadalajara|  Monitor| Pantallas|       1|       4200.0|  29| 4200.0|
|        2|Guadalajara|  Teclado|Accesorios|       1|        350.0|  29|  350.0|
|        5|     Puebla|  Monitor| Pantallas|       2|       8400.0|  27| 4200.0|
|        5|     Puebla|    Mouse|Accesorios|       1|        250.0|  27|  250.0|
|        3|  Monterrey|Audífonos|

El objetivo será:

Filtrar clientes mayores de 25 años.

Filtrar compras solo de productos cuyo precio sea mayor a 1000.

Agrupar por ciudad y calcular el gasto total de los clientes de esa ciudad.

In [45]:
# =========================
# WHERE: clientes mayores de 25 años
# =========================
df_join.where(col("Edad")>25).show()

# =========================
# FILTER: compras con productos de precio > 1000
# =========================
df_join.filter(col("Precio")>1000).show()



+---------+-----------+--------+----------+--------+-------------+----+-------+
|ClienteID|     Ciudad|Producto| Categoria|Cantidad|Importe Total|Edad| Precio|
+---------+-----------+--------+----------+--------+-------------+----+-------+
|        2|Guadalajara| Monitor| Pantallas|       1|       4200.0|  29| 4200.0|
|        2|Guadalajara| Teclado|Accesorios|       1|        350.0|  29|  350.0|
|        5|     Puebla| Monitor| Pantallas|       2|       8400.0|  27| 4200.0|
|        5|     Puebla|   Mouse|Accesorios|       1|        250.0|  27|  250.0|
|        4|       CDMX|  Laptop|   Cómputo|       1|      18500.0|  34|18500.0|
+---------+-----------+--------+----------+--------+-------------+----+-------+

+---------+-----------+--------+---------+--------+-------------+----+-------+
|ClienteID|     Ciudad|Producto|Categoria|Cantidad|Importe Total|Edad| Precio|
+---------+-----------+--------+---------+--------+-------------+----+-------+
|        2|Guadalajara| Monitor|Pantallas|

In [46]:
from pyspark.sql.functions import col, sum

# =========================
# 3) GROUP BY: gasto total por ciudad
# =========================

df_join.groupBy(col("Ciudad")).agg(sum("Importe Total").alias("Gasto Total")).show()

+-----------+-----------+
|     Ciudad|Gasto Total|
+-----------+-----------+
|  Monterrey|     2700.0|
|Guadalajara|     4550.0|
|     Puebla|     8650.0|
|       CDMX|    19000.0|
+-----------+-----------+



#Ejercicio “360° de Ventas”

Datasets (los mismos que definimos: df_clientes, df_productos, df_compras).

Objetivo

Construir, paso a paso, un reporte final por Ciudad x Categoría con:

Total de ventas (suma de Cantidad * Precio)

Ticket promedio por compra en esa combinación

Número de clientes distintos que compraron

Producto más vendido por cantidad dentro de la categoría (sin usar ventanas; solo groupBy + join)

Cliente top por gasto dentro de esa categoría (también sin ventanas)

Nota: “más vendido” = mayor cantidad total. “Top por gasto” = mayor suma de importe.

In [47]:
from pyspark.sql import functions as F

# =========================
# 0) Punto de partida
# =========================
# Se asume que ya tienes:
# df_clientes (ClienteID, Nombre, Edad, Ciudad)
# df_productos (ProductoID, Producto, Categoria, Precio)
# df_compras  (CompraID, ClienteID, ProductoID, Cantidad)

# Chequeo rápido
# df_clientes.show(); df_productos.show(); df_compras.show()

# =========================
# 1) Enriquecer compras con precio e info de cliente/producto
# =========================


df_enriquecer = df_clientes.join(df_compras,on="ClienteID",how="inner")

df_enriquecer = df_enriquecer.join(df_productos,on="ProductoID",how="inner")

df_enriquecerimporte = df_enriquecer.withColumn("Importe total", col("Cantidad")*col("Precio"))


#withcolumn agrega nueva columna

df_enriquecerimporte.show()

+----------+---------+------+----+-----------+--------+--------+---------+----------+-------+-------------+
|ProductoID|ClienteID|Nombre|Edad|     Ciudad|CompraID|Cantidad| Producto| Categoria| Precio|Importe total|
+----------+---------+------+----+-----------+--------+--------+---------+----------+-------+-------------+
|        10|        2|  Luis|  29|Guadalajara|     102|       1|  Teclado|Accesorios|  350.0|        350.0|
|        11|        5| María|  27|     Puebla|     105|       1|    Mouse|Accesorios|  250.0|        250.0|
|        11|        1|   Ana|  23|       CDMX|     100|       2|    Mouse|Accesorios|  250.0|        500.0|
|        12|        5| María|  27|     Puebla|     106|       2|  Monitor| Pantallas| 4200.0|       8400.0|
|        12|        2|  Luis|  29|Guadalajara|     101|       1|  Monitor| Pantallas| 4200.0|       4200.0|
|        13|        4| Diego|  34|       CDMX|     104|       1|   Laptop|   Cómputo|18500.0|      18500.0|
|        14|        3| Sofía

In [48]:

# =========================
# 2) Filtrar calidad de datos (opcional, pero parte del ejercicio)
# - compras con Cantidad > 0 y Precio > 0
# - edades razonables (>= 18)
# =========================


from pyspark.sql.functions import col

df_cantidad_mayor_0 = df_enriquecer.where(
    (col("Cantidad") > 0) & (col("Precio") > 0) & (col("Edad") >= 18)
)

df_cantidad_mayor_0.show()



+----------+---------+------+----+-----------+--------+--------+---------+----------+-------+
|ProductoID|ClienteID|Nombre|Edad|     Ciudad|CompraID|Cantidad| Producto| Categoria| Precio|
+----------+---------+------+----+-----------+--------+--------+---------+----------+-------+
|        10|        2|  Luis|  29|Guadalajara|     102|       1|  Teclado|Accesorios|  350.0|
|        11|        5| María|  27|     Puebla|     105|       1|    Mouse|Accesorios|  250.0|
|        11|        1|   Ana|  23|       CDMX|     100|       2|    Mouse|Accesorios|  250.0|
|        12|        5| María|  27|     Puebla|     106|       2|  Monitor| Pantallas| 4200.0|
|        12|        2|  Luis|  29|Guadalajara|     101|       1|  Monitor| Pantallas| 4200.0|
|        13|        4| Diego|  34|       CDMX|     104|       1|   Laptop|   Cómputo|18500.0|
|        14|        3| Sofía|  21|  Monterrey|     103|       3|Audífonos|     Audio|  900.0|
+----------+---------+------+----+-----------+--------+-----

In [49]:
# =========================
# 3) Métricas base por Ciudad x Categoria
# - Ventas totales
# - Ticket promedio (Importe promedio por compra)
# - Clientes distintos
# =========================

df_metricas = df_enriquecerimporte.groupBy("Ciudad", "Categoria").agg(
    F.sum("Importe total").alias("Ventas Totales"),
    F.avg("Importe total").alias("Ticket Promedio"),
    F.countDistinct("ClienteID").alias("Clientes Distintos")
)

df_metricas.show()

+-----------+----------+--------------+---------------+------------------+
|     Ciudad| Categoria|Ventas Totales|Ticket Promedio|Clientes Distintos|
+-----------+----------+--------------+---------------+------------------+
|       CDMX|   Cómputo|       18500.0|        18500.0|                 1|
|  Monterrey|     Audio|        2700.0|         2700.0|                 1|
|Guadalajara|Accesorios|         350.0|          350.0|                 1|
|     Puebla|Accesorios|         250.0|          250.0|                 1|
|     Puebla| Pantallas|        8400.0|         8400.0|                 1|
|Guadalajara| Pantallas|        4200.0|         4200.0|                 1|
|       CDMX|Accesorios|         500.0|          500.0|                 1|
+-----------+----------+--------------+---------------+------------------+



In [50]:
# =========================
# 4) Producto más vendido (por cantidad) en cada Ciudad x Categoria
#    4.1) Cantidad total por Ciudad x Categoria x Producto
#    4.2) Máximo por Ciudad x Categoria
#    4.3) Join para quedarnos con el/los producto(s) que logran ese máximo
# =========================



In [51]:
# =========================
# 4) Producto más vendido (por cantidad) en cada Ciudad x Categoria
#    4.1) Cantidad total por Ciudad x Categoria x Producto
# =========================

df_cantidad_tot= df_enriquecerimporte.groupBy("Ciudad", "Categoria", "Producto").agg(
    F.sum("Cantidad").alias("Cantidad Total")
)

df_cantidad_tot.show()

+-----------+----------+---------+--------------+
|     Ciudad| Categoria| Producto|Cantidad Total|
+-----------+----------+---------+--------------+
|Guadalajara|Accesorios|  Teclado|             1|
|Guadalajara| Pantallas|  Monitor|             1|
|     Puebla| Pantallas|  Monitor|             2|
|  Monterrey|     Audio|Audífonos|             3|
|     Puebla|Accesorios|    Mouse|             1|
|       CDMX|   Cómputo|   Laptop|             1|
|       CDMX|Accesorios|    Mouse|             2|
+-----------+----------+---------+--------------+



In [52]:
#    4.2) Máximo por Ciudad x Categoria
# =========================

df_max_cantidad_categoria = df_cantidad_tot.groupBy("Ciudad", "Categoria").agg(
    F.max("Cantidad Total").alias("Máximo por Ciudad x Categoria")
)

df_max_cantidad_categoria.show()

+-----------+----------+-----------------------------+
|     Ciudad| Categoria|Máximo por Ciudad x Categoria|
+-----------+----------+-----------------------------+
|       CDMX|   Cómputo|                            1|
|  Monterrey|     Audio|                            3|
|Guadalajara|Accesorios|                            1|
|     Puebla|Accesorios|                            1|
|     Puebla| Pantallas|                            2|
|Guadalajara| Pantallas|                            1|
|       CDMX|Accesorios|                            2|
+-----------+----------+-----------------------------+



In [53]:
#    4.3) Join para quedarnos con el/los producto(s) que logran ese máximo
# =========================

df_producto_mas_vendido = df_cantidad_tot.join(
    df_max_cantidad_categoria,on=["Ciudad", "Categoria"],how="inner"
).where(
    df_cantidad_tot["Cantidad Total"] == df_max_cantidad_categoria["Máximo por Ciudad x Categoria"]
).select(
    df_cantidad_tot["Producto"],
    df_cantidad_tot["Ciudad"],
    df_cantidad_tot["Categoria"],

    df_cantidad_tot["Cantidad Total"].alias("Cantidad Mas Vendida")
)

df_producto_mas_vendido.show()

+---------+-----------+----------+--------------------+
| Producto|     Ciudad| Categoria|Cantidad Mas Vendida|
+---------+-----------+----------+--------------------+
|  Teclado|Guadalajara|Accesorios|                   1|
|  Monitor|Guadalajara| Pantallas|                   1|
|  Monitor|     Puebla| Pantallas|                   2|
|Audífonos|  Monterrey|     Audio|                   3|
|    Mouse|     Puebla|Accesorios|                   1|
|   Laptop|       CDMX|   Cómputo|                   1|
|    Mouse|       CDMX|Accesorios|                   2|
+---------+-----------+----------+--------------------+



In [54]:
# =========================
# 5) Cliente top por gasto dentro de cada Ciudad x Categoria
#    5.1) Gasto por Ciudad x Categoria x Cliente
#    5.2) Máximo gasto por Ciudad x Categoria
#    5.3) Join para recuperar el/los clientes top
# =========================



In [55]:
# =========================
# 5) Cliente top por gasto dentro de cada Ciudad x Categoria
#    5.1) Gasto por Ciudad x Categoria x Cliente
# =========================

df_gasto_por_cliente = df_enriquecerimporte.groupBy("Ciudad", "Categoria", "ClienteID", "Nombre").agg(
    F.sum("Importe total").alias("Gasto Total Cliente")
)

df_gasto_por_cliente.show()

+-----------+----------+---------+------+-------------------+
|     Ciudad| Categoria|ClienteID|Nombre|Gasto Total Cliente|
+-----------+----------+---------+------+-------------------+
|       CDMX|   Cómputo|        4| Diego|            18500.0|
|       CDMX|Accesorios|        1|   Ana|              500.0|
|Guadalajara| Pantallas|        2|  Luis|             4200.0|
|Guadalajara|Accesorios|        2|  Luis|              350.0|
|     Puebla| Pantallas|        5| María|             8400.0|
|  Monterrey|     Audio|        3| Sofía|             2700.0|
|     Puebla|Accesorios|        5| María|              250.0|
+-----------+----------+---------+------+-------------------+



In [56]:
#    5.2) Máximo gasto por Ciudad x Categoria
# =========================

df_max_gasto_categoria = df_gasto_por_cliente.groupBy("Ciudad", "Categoria").agg(
    F.max("Gasto Total Cliente").alias("Max Gasto x Categoria")
)

df_max_gasto_categoria.show()

+-----------+----------+---------------------+
|     Ciudad| Categoria|Max Gasto x Categoria|
+-----------+----------+---------------------+
|       CDMX|   Cómputo|              18500.0|
|  Monterrey|     Audio|               2700.0|
|Guadalajara|Accesorios|                350.0|
|     Puebla|Accesorios|                250.0|
|     Puebla| Pantallas|               8400.0|
|Guadalajara| Pantallas|               4200.0|
|       CDMX|Accesorios|                500.0|
+-----------+----------+---------------------+



In [57]:
#    5.3) Join para recuperar el/los clientes top
# =========================

df_cliente_top_gasto = df_gasto_por_cliente.join(
    df_max_gasto_categoria,
    on=["Ciudad", "Categoria"],
    how="inner"
).where(
    F.col("Gasto Total Cliente") == F.col("Max Gasto x Categoria")
).select(
    df_gasto_por_cliente["Nombre"],
    df_gasto_por_cliente["Ciudad"],
    df_gasto_por_cliente["Categoria"],
    df_gasto_por_cliente["ClienteID"],
    df_gasto_por_cliente["Gasto Total Cliente"].alias("Top Cliente")
)

df_cliente_top_gasto.show()

+------+-----------+----------+---------+-----------+
|Nombre|     Ciudad| Categoria|ClienteID|Top Cliente|
+------+-----------+----------+---------+-----------+
| Diego|       CDMX|   Cómputo|        4|    18500.0|
|   Ana|       CDMX|Accesorios|        1|      500.0|
|  Luis|Guadalajara| Pantallas|        2|     4200.0|
|  Luis|Guadalajara|Accesorios|        2|      350.0|
| María|     Puebla| Pantallas|        5|     8400.0|
| Sofía|  Monterrey|     Audio|        3|     2700.0|
| María|     Puebla|Accesorios|        5|      250.0|
+------+-----------+----------+---------+-----------+

