In [0]:
# ------------------------------
# 1. Importamos librerias
# ------------------------------

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, avg, max, min, udf
from pyspark.sql.types import StringType
import re

In [0]:
# En databricks no es necesario crear la session
spark

<pyspark.sql.connect.session.SparkSession at 0xff646359bb90>

In [0]:
# ------------------------------
# 2. Carga de Datos desde Archivos
# ------------------------------
file_path = '/Volumes/workspace/retail/retail_csv/csv/'
customers = spark.read.option("header", True).csv(f"{file_path}/customers.csv")
departments = spark.read.option("header", True).csv(f"{file_path}/departments.csv")
categories = spark.read.option("header", True).csv(f"{file_path}/categories.csv")
products = spark.read.option("header", True).csv(f"{file_path}/products.csv")
orders = spark.read.option("header", True).csv(f"{file_path}/orders.csv")
order_items = spark.read.option("header", True).csv(f"{file_path}/order_items.csv")


In [0]:
# Esto es solo valido en Databricks, muestra el tipo de dato del esquema
customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)



In [0]:
# Convertir algunos campos a los tipos adecuados
# El withColumn puede crear o editar en base a datos ya existentes.
customers = customers.withColumn("customer_id", col("customer_id").cast("int"))
products = products.withColumn("product_id", col("product_id").cast("int"))
order_items = order_items.withColumn("order_item_product_id", col("order_item_product_id").cast("int"))
order_items = order_items.withColumn("order_item_subtotal", col("order_item_subtotal").cast("float"))

In [0]:
# ------------------------------
# 3. Validaciones Básicas
# ------------------------------

# Verificar nulos en customers
customers.select([count(when(col(c).isNull(), c)).alias(c) for c in customers.columns]).show()

# Validar unicidad de customer_id
customers.groupBy("customer_id").count().filter("count > 1").show()


+-----------+--------------+--------------+--------------+-----------------+---------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+---------------+-------------+--------------+----------------+
|          0|             0|             0|             0|                0|              0|            0|             0|               0|
+-----------+--------------+--------------+--------------+-----------------+---------------+-------------+--------------+----------------+

+-----------+-----+
|customer_id|count|
+-----------+-----+
+-----------+-----+



In [0]:

# ------------------------------
# 4. Joins y Enriquecimiento
# ------------------------------

# Join entre products, categories y departments
products_full = products \
    .join(categories, products.product_category_id == categories.category_id, "left") \
    .join(departments, categories.category_department_id == departments.department_id, "left")

# Join entre orders y customers
orders_customers = orders.join(customers, orders.order_customer_id == customers.customer_id, "left")

# Join entre order_items y products
order_details = order_items.join(products, order_items.order_item_product_id == products.product_id, "left")


In [0]:
# En databricks es posible tener el dataframe como si fuera una hoja en excel
display(order_details)

# Otras formas de ver datos son:
# display(order_details)
# order_details.show()

order_item_id,order_item_order_id,order_item_product_id,order_item_quantity,order_item_subtotal,order_item_product_price,product_id,product_category_id,product_name,product_description,product_price,product_image
1,1,957,1,299.98,299.98,957,43,Diamondback Women's Serene Classic Comfort Bi,,299.98,http://images.acmesports.sports/Diamondback+Women%27s+Serene+Classic+Comfort+Bike+2014
2,2,1073,1,199.99,199.99,1073,48,Pelican Sunstream 100 Kayak,,199.99,http://images.acmesports.sports/Pelican+Sunstream+100+Kayak
3,2,502,5,250.0,50.0,502,24,Nike Men's Dri-FIT Victory Golf Polo,,50.0,http://images.acmesports.sports/Nike+Men%27s+Dri-FIT+Victory+Golf+Polo
4,2,403,1,129.99,129.99,403,18,Nike Men's CJ Elite 2 TD Football Cleat,,129.99,http://images.acmesports.sports/Nike+Men%27s+CJ+Elite+2+TD+Football+Cleat
5,4,897,2,49.98,24.99,897,40,Team Golf New England Patriots Putter Grip,,24.99,http://images.acmesports.sports/Team+Golf+New+England+Patriots+Putter+Grip
6,4,365,5,299.95,59.99,365,17,Perfect Fitness Perfect Rip Deck,,59.99,http://images.acmesports.sports/Perfect+Fitness+Perfect+Rip+Deck
7,4,502,3,150.0,50.0,502,24,Nike Men's Dri-FIT Victory Golf Polo,,50.0,http://images.acmesports.sports/Nike+Men%27s+Dri-FIT+Victory+Golf+Polo
8,4,1014,4,199.92,49.98,1014,46,O'Brien Men's Neoprene Life Vest,,49.98,http://images.acmesports.sports/O%27Brien+Men%27s+Neoprene+Life+Vest
9,5,957,1,299.98,299.98,957,43,Diamondback Women's Serene Classic Comfort Bi,,299.98,http://images.acmesports.sports/Diamondback+Women%27s+Serene+Classic+Comfort+Bike+2014
10,5,365,5,299.95,59.99,365,17,Perfect Fitness Perfect Rip Deck,,59.99,http://images.acmesports.sports/Perfect+Fitness+Perfect+Rip+Deck


Databricks visualization. Run in Databricks to view.

In [0]:
# ------------------------------
# 5. Agregaciones
# ------------------------------

# Ventas por producto
ventas_por_producto = order_details.groupBy("product_name").sum("order_item_subtotal")
ventas_por_producto.orderBy("sum(order_item_subtotal)", ascending=False).show(10)

# Ventas por departamento
order_details_full = order_details \
    .join(products_full, "product_id", "left")

ventas_por_departamento = order_details_full \
    .groupBy("department_name").sum("order_item_subtotal") \
    .orderBy("sum(order_item_subtotal)", ascending=False)

ventas_por_departamento.show(10)


+--------------------+------------------------+
|        product_name|sum(order_item_subtotal)|
+--------------------+------------------------+
|Field & Stream Sp...|       6929653.690338135|
|Perfect Fitness P...|        4421143.14352417|
|Diamondback Women...|       4118425.570831299|
|Nike Men's Free 5...|       3667633.196662903|
|Nike Men's Dri-FI...|               3147800.0|
|Pelican Sunstream...|       3099845.085144043|
|Nike Men's CJ Eli...|      2891757.6622009277|
|O'Brien Men's Neo...|        2888993.91355896|
|Under Armour Girl...|      1269082.6712722778|
|adidas Youth Germ...|                 67830.0|
+--------------------+------------------------+
only showing top 10 rows
+---------------+------------------------+
|department_name|sum(order_item_subtotal)|
+---------------+------------------------+
|       Fan Shop|    1.7107766279556274E7|
|        Apparel|       7323700.445373535|
|           Golf|       4609028.242256165|
|       Footwear|       4006498.767742157|
| 

In [0]:
# ------------------------------
# 6. Clasificación de Estados
# ------------------------------

def clasificar_estado(estado):
    if estado in ["CLOSED", "COMPLETE"]:
        return "Finalizado"
    elif estado in ["PENDING", "PROCESSING"]:
        return "En Proceso"
    else:
        return "Otro"
# StringType() es el tipo de dato que esta retornando de la función
clasificar_udf = udf(clasificar_estado, StringType())
orders_customers = orders_customers.withColumn("estado_clasificado", clasificar_udf(col("order_status")))

orders_customers.groupBy("estado_clasificado").count().show()


+------------------+-----+
|estado_clasificado|count|
+------------------+-----+
|        Finalizado|30455|
|              Otro|22543|
|        En Proceso|15885|
+------------------+-----+



In [0]:
# Validar correos electrónicos
validar_email = udf(lambda x: bool(re.match(r"[^@]+@[^@]+\.[^@]+", str(x))) if x else False, StringType())
customers = customers.withColumn("email_valido", validar_email(col("customer_email")))
customers.groupBy("email_valido").count().show()

# Verificar existencia de productos referenciados
referencias_invalidas = order_items.join(products, order_items.order_item_product_id == products.product_id, "left_anti")
print("Productos en order_items no encontrados en products:")
referencias_invalidas.show()


+------------+-----+
|email_valido|count|
+------------+-----+
|       false|12435|
+------------+-----+

Productos en order_items no encontrados en products:
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+

