In [5]:
#Configuración inicial y verificación del entorno

import pyspark.sql.functions as F
from pyspark.sql.types import *
from notebookutils import mssparkutils
import pandas as pd

# Verificar versión de Spark
print(f"Versión de Spark: {spark.version}")

StatementMeta(, 2177e04b-b356-4bf4-ad6e-9ee8755c5905, 7, Finished, Available, Finished)

Versión de Spark: 3.5.1.5.4.20250519.1


In [2]:
# Leer los archivos CSV  y inferimos esquemas

try:
    # Leer archivo de sales
    df_sales = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("Files/Bronze/sales.csv")
    
    # Leer archivo de brokers
    df_brokers = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("Files/Bronze/brokers.csv")

    # Leer archivo de campaigns
    df_campaigns = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("Files/Bronze/campaigns.csv")

    # Leer archivo de clients
    df_clients = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("Files/Bronze/clients.csv")

    # Leer archivo de leads
    df_leads = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("Files/Bronze/leads.csv")

    # Leer archivo de projects
    df_projects = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("Files/Bronze/projects.csv")

    # Leer archivo de properties
    df_properties = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("Files/Bronze/properties.csv")
    
   
    
    print("✅ Archivos CSV leídos correctamente")
except Exception as e:
    print(f"Error al leer archivos: {str(e)}")

StatementMeta(, 2177e04b-b356-4bf4-ad6e-9ee8755c5905, 4, Finished, Available, Finished)

✅ Archivos CSV leídos correctamente


In [3]:
# Exploración de los datos
# Mostramos una vista previa de los datos cargados

print("Vista previa de los datos de sales:")
display(df_sales.limit(5))

print("Vista previa de los datos de brokers:")
display(df_brokers.limit(5))

print("Vista previa de los datos de campaigns:")
display(df_campaigns.limit(5))

print("Vista previa de los datos de clients:")
display(df_clients.limit(5))

print("Vista previa de los datos de leads:")
display(df_leads.limit(5))

print("Vista previa de los datos de projects:")
display(df_projects.limit(5))

print("Vista previa de los datos de properties:")
display(df_properties.limit(5))


StatementMeta(, 2177e04b-b356-4bf4-ad6e-9ee8755c5905, 5, Finished, Available, Finished)

Vista previa de los datos de sales:


SynapseWidget(Synapse.DataFrame, cb574b7e-eed8-4c7a-94d8-4282a510a82d)

Vista previa de los datos de brokers:


SynapseWidget(Synapse.DataFrame, 47acb130-18b1-419d-8942-cf12e8aee740)

Vista previa de los datos de campaigns:


SynapseWidget(Synapse.DataFrame, e08bd79a-1c7c-4144-83ef-50422bf444e2)

Vista previa de los datos de clients:


SynapseWidget(Synapse.DataFrame, 5b245c83-9be0-4089-9314-d0e3810db8b8)

Vista previa de los datos de leads:


SynapseWidget(Synapse.DataFrame, bf27903b-f7d7-49da-b775-fa660e98d202)

Vista previa de los datos de projects:


SynapseWidget(Synapse.DataFrame, 84a68203-f9c3-486f-a9b2-6d38d8c729f8)

Vista previa de los datos de properties:


SynapseWidget(Synapse.DataFrame, 5980789c-0552-4ce7-8411-85f1ff308cfe)

In [6]:

# Realizar transformaciones en los dataframes
# Aplicamos transformaciones básicas para mejorar la calidad de los datos

# Transformación de clients: Añadir columna nombre_completo
df_clients_procesado = df_clients.withColumn(
    "Full_Name", 
    F.concat(F.col("FirstName"), F.lit(" "), F.col("LastName"))
)

# Transformación de tiendas: Extraer el año de apertura
df_leads_procesado = df_leads.withColumn(
    "año", 
    F.year(F.col("LeadDate"))
)


print("✅ Transformaciones aplicadas correctamente")



StatementMeta(, 2177e04b-b356-4bf4-ad6e-9ee8755c5905, 8, Finished, Available, Finished)

✅ Transformaciones aplicadas correctamente


In [7]:
# Mostrar ejemplos de las transformaciones
print("Ejemplo de clientes procesados:")
display(df_clients_procesado.limit(5))

print("Ejemplo de leads procesados:")
display(df_leads_procesado.limit(5))


StatementMeta(, 2177e04b-b356-4bf4-ad6e-9ee8755c5905, 9, Finished, Available, Finished)

Ejemplo de clientes procesados:


SynapseWidget(Synapse.DataFrame, fea7a716-9cfc-4017-a1e5-6caa23fa7c89)

Ejemplo de leads procesados:


SynapseWidget(Synapse.DataFrame, a3448c5f-2712-406a-a0fa-4623fca0cbd3)

In [8]:

# Guardar los dataframes como tablas Delta
# Guardamos los dataframes procesados como tablas Delta en el lakehouse


try:
    # Guardar sales
    df_sales.write.format("delta").mode("overwrite").save("Files/Silver/sales_delta")
    
    # Guardar brokers
    df_brokers.write.format("delta").mode("overwrite").save("Files/Silver/brokers_delta")
    
    # Guardar campaigns
    df_campaigns.write.format("delta").mode("overwrite").save("Files/Silver/campaigns_delta")
    
    # Guardar df_clients_procesado
    df_clients_procesado.write.format("delta").mode("overwrite").save("Files/Silver/clients_delta")
    
    # Guardar leads
    df_leads_procesado.write.format("delta").mode("overwrite").save("Files/Silver/leads_delta")

    # Guardar projects
    df_projects.write.format("delta").mode("overwrite").save("Files/Silver/projects_delta")

    # Guardar properties
    df_properties.write.format("delta").mode("overwrite").save("Files/Silver/properties_delta")
    
    print("✅ Dataframes guardados como archivos Delta correctamente")
except Exception as e:
    print(f"Error al guardar archivos Delta: {str(e)}")


StatementMeta(, 2177e04b-b356-4bf4-ad6e-9ee8755c5905, 10, Finished, Available, Finished)

✅ Dataframes guardados como archivos Delta correctamente


In [9]:
# Registrar tablas como vistas temporales en la sesión actual

try:
    # Registrar tablas como vistas temporales
    df_sales.createOrReplaceTempView("fact_sales")
    df_brokers.createOrReplaceTempView("dim_brokers")
    df_campaigns.createOrReplaceTempView("dim_campaigns")
    df_clients_procesado.createOrReplaceTempView("dim_clients")
    df_leads_procesado.createOrReplaceTempView("fact_leads")
    df_projects.createOrReplaceTempView("dim_projects")
    df_properties.createOrReplaceTempView("dim_properties")
    
    # Verificar que se han creado correctamente
    tables = spark.sql("SHOW TABLES").collect()
    print("Tablas disponibles en la sesión:")
    for table in tables:
        print(f" - {table.tableName}")
    
    print("✅ Vistas temporales creadas correctamente para la sesión actual")
    
    # Alternativa: Guardar como tablas directamente en el Lakehouse 
    # Esto registra las tablas en el catálogo del Lakehouse actual
    print("\nRegistrando tablas en el catálogo del Lakehouse...")
    
    # Registrar los dataframes como tablas en el Lakehouse actual parte Tables
    df_sales.write.format("delta").mode("overwrite").saveAsTable("fact_sales")
    df_brokers.write.format("delta").mode("overwrite").saveAsTable("dim_brokers")
    df_campaigns.write.format("delta").mode("overwrite").saveAsTable("dim_campaigns")
    df_clients_procesado.write.format("delta").mode("overwrite").saveAsTable("dim_clients")
    df_leads_procesado.write.format("delta").mode("overwrite").saveAsTable("fact_leads")
    df_projects.write.format("delta").mode("overwrite").saveAsTable("dim_projects")
    df_properties.write.format("delta").mode("overwrite").saveAsTable("dim_properties")
    
    print("✅ Tablas creadas correctamente en el catálogo del Lakehouse")
except Exception as e:
    print(f"Error al crear tablas: {str(e)}")

df = spark.sql("SELECT * FROM LKH_DATAPATH_JCA_PROJECT_FINAL.fact_sales LIMIT 1000")
display(df)

df = spark.sql("SELECT * FROM LKH_DATAPATH_JCA_PROJECT_FINAL.dim_clients LIMIT 1000")
display(df)

StatementMeta(, 2177e04b-b356-4bf4-ad6e-9ee8755c5905, 11, Finished, Available, Finished)

Tablas disponibles en la sesión:
 - dim_brokers
 - dim_campaigns
 - dim_clients
 - dim_projects
 - dim_properties
 - fact_leads
 - fact_sales
✅ Vistas temporales creadas correctamente para la sesión actual

Registrando tablas en el catálogo del Lakehouse...
✅ Tablas creadas correctamente en el catálogo del Lakehouse


SynapseWidget(Synapse.DataFrame, ce431cac-d597-44f6-90a2-7341b9c17f9d)

SynapseWidget(Synapse.DataFrame, e7db6ae7-03f8-4a79-9ece-1f0e84ef52a7)

In [None]:
# Consultar el modelo estrella completo
# Realizamos una consulta SQL que une todas las tablas del modelo estrella

query = """
SELECT 
    v.id_venta,
    t.fecha,
    t.tipo_dia,
    c.nombre_completo AS cliente,
    c.ciudad AS ciudad_cliente,
    p.nombre AS producto,
    p.categoria,
    p.subcategoria,
    ti.nombre AS tienda,
    ti.ciudad AS ciudad_tienda,
    v.cantidad,
    v.precio_unitario,
    v.descuento,
    v.total,
    v.porcentaje_descuento
FROM 
    fact_ventas v
JOIN 
    dim_tiempo t ON v.id_fecha = t.id_fecha
JOIN 
    dim_clientes c ON v.id_cliente = c.id_cliente
JOIN 
    dim_productos p ON v.id_producto = p.id_producto
JOIN 
    dim_tiendas ti ON v.id_tienda = ti.id_tienda
ORDER BY 
    v.id_venta
"""

try:
    # Ejecutar la consulta
    resultado = spark.sql(query)
    
    # Mostrar los resultados
    print("Consulta del modelo estrella completo:")
    display(resultado)
    
    print("✅ Consulta ejecutada correctamente")
except Exception as e:
    print(f"Error al ejecutar la consulta: {str(e)}")