In [73]:
from pyspark.sql import SparkSession
import os
import ipywidgets as widgets
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import col
from pyspark.sql.functions import round
import re



# Inicializar Spark con soporte para PostgreSQL JDBC
def get_spark_session():
    jar_path = os.path.abspath("postgresql-42.7.5.jar")  # Ruta del driver JDBC

    return SparkSession.builder \
        .appName("Business Intelligence") \
        .config("spark.jars", jar_path) \
        .config("spark.driver.extraClassPath", jar_path) \
        .getOrCreate()

# Obtener conexión PostgreSQL desde Spark
def get_postgres_connection():
    url = "jdbc:postgresql://localhost:5432/postgres"
    properties = {
        "user": "postgres",
        "password": "postgres",
        "driver": "org.postgresql.Driver"
    }
    return url, properties

# Ejecutar consultas SQL en PostgreSQL usando Spark JDBC
def execute_query(spark, query):
    url, properties = get_postgres_connection()
    return spark.read.jdbc(url=url, table=f"({query}) as temp", properties=properties)

print("Generando reportes de Business Intelligence basados en el Data Warehouse.")

# Consultas SQL y visualización de datos
queries = {
    "Top 10 Productos más vendidos": """
        SELECT p.nombre, SUM(hv.cantidad) AS total_vendido
        FROM hechos_ventas hv
        JOIN productos p ON hv.id_producto = p.id
        GROUP BY p.nombre, hv.id_producto
        ORDER BY total_vendido DESC
        LIMIT 10
    """,
    
    "Top 5 Clientes con más pedidos": """
        SELECT c.nombre, COUNT(hv.id_cliente) AS num_pedidos
        FROM hechos_ventas hv
        JOIN clientes c ON hv.id_cliente = c.id
        GROUP BY c.nombre, hv.id_cliente
        ORDER BY num_pedidos DESC
        LIMIT 5
    """,
    
    "Top 5 Corresponsales con más pedidos": """
        SELECT c.nombre, COUNT(*) AS num_pedidos
        FROM hechos_ventas hv
        JOIN dim_corresponsales c ON hv.id_corresponsal = c.id_corresponsal
        GROUP BY c.nombre, hv.id_corresponsal
        ORDER BY num_pedidos DESC
        LIMIT 5
    """,
    
    "Total de pagos diario por productos": """
        SELECT ROW_NUMBER() OVER (ORDER BY TO_CHAR(hv.fecha, 'YYYY-MM-DD')) AS id,
               p.nombre AS nombre_producto,
               TO_CHAR(hv.fecha, 'YYYY-MM-DD') AS dia, 
               SUM(hv.total_pagado) AS total_diario
        FROM hechos_ventas hv
        JOIN dim_productos p ON hv.id_producto = p.id_producto
        GROUP BY p.nombre, hv.id_producto, dia
        ORDER BY dia DESC
    """,
    
    "Total de pagos mensual por productos": """
        SELECT ROW_NUMBER() OVER (ORDER BY TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY')) AS id,
                p.nombre AS nombre_producto, 
               TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY') AS mes, 
               SUM(hv.total_pagado) AS total_mensual
        FROM hechos_ventas hv
        JOIN dim_productos p ON hv.id_producto = p.id_producto
        GROUP BY p.nombre, hv.id_producto, TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY'), DATE_TRUNC('month', hv.fecha)
        ORDER BY DATE_TRUNC('month', hv.fecha) DESC
    """,
    
    "Total de pagos diario por clientes": """
        SELECT ROW_NUMBER() OVER (ORDER BY TO_CHAR(hv.fecha, 'YYYY-MM-DD')) AS id,
                c.nombre AS nombre_cliente,
               TO_CHAR(hv.fecha, 'YYYY-MM-DD') AS dia, 
               SUM(hv.total_pagado) AS total_diario
        FROM hechos_ventas hv
        JOIN dim_clientes c ON hv.id_cliente = c.id_cliente
        GROUP BY c.nombre, hv.id_cliente, dia
        ORDER BY dia DESC
    """,
    
    "Total de pagos mensual por clientes": """
        SELECT ROW_NUMBER() OVER (ORDER BY TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY')) AS id,
                c.nombre AS nombre_cliente, 
               TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY') AS mes, 
               SUM(hv.total_pagado) AS total_mensual
        FROM hechos_ventas hv
        JOIN dim_clientes c ON hv.id_cliente = c.id_cliente
        GROUP BY c.nombre, hv.id_cliente, TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY'), DATE_TRUNC('month', hv.fecha)
        ORDER BY DATE_TRUNC('month', hv.fecha) DESC
    """,
    
    "Total de pagos diario por corresponsal": """
        SELECT ROW_NUMBER() OVER (ORDER BY TO_CHAR(hv.fecha, 'YYYY-MM-DD')) AS id,
                c.nombre AS nombre_corresponsal,
               TO_CHAR(hv.fecha, 'YYYY-MM-DD') AS dia, 
               SUM(hv.total_pagado) AS total_diario
        FROM hechos_ventas hv
        JOIN dim_corresponsales c ON hv.id_corresponsal = c.id_corresponsal
        GROUP BY c.nombre, hv.id_corresponsal, dia
        ORDER BY dia DESC
    """,
    
    "Total de pagos mensual por corresponsal": """
        SELECT ROW_NUMBER() OVER (ORDER BY TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY')) AS id,
                c.nombre AS nombre_corresponsal, 
               TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY') AS mes, 
               SUM(hv.total_pagado) AS total_mensual
        FROM hechos_ventas hv
        JOIN dim_corresponsales c ON hv.id_corresponsal = c.id_corresponsal
        GROUP BY c.nombre, hv.id_corresponsal, TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY'), DATE_TRUNC('month', hv.fecha)
        ORDER BY DATE_TRUNC('month', hv.fecha) DESC
    """,
    
    "Variación diaria de pedidos por productos": """
        WITH pedidos_diarios AS (
                SELECT hv.id_producto, 
                       TO_CHAR(hv.fecha, 'YYYY-MM-DD') AS dia, 
                       COUNT(*) AS total_pedidos
            FROM hechos_ventas hv
            GROUP BY hv.id_producto, dia
            ),
            variacion_pedidos AS (
                SELECT pd.id_producto,
                       pd.dia,
                       pd.total_pedidos,
                       LAG(pd.total_pedidos) OVER (PARTITION BY pd.id_producto ORDER BY pd.dia) AS pedidos_dia_anterior,
                       (pd.total_pedidos - LAG(pd.total_pedidos) OVER (PARTITION BY pd.id_producto ORDER BY pd.dia)) AS variacion_diaria
                FROM pedidos_diarios pd
            )
        SELECT ROW_NUMBER() OVER (ORDER BY vp.dia) AS id,
                dp.nombre AS nombre_producto, 
               vp.dia, 
               vp.total_pedidos, 
               vp.pedidos_dia_anterior, 
               vp.variacion_diaria
        FROM variacion_pedidos vp
        JOIN dim_productos dp ON vp.id_producto = dp.id_producto
        ORDER BY vp.dia DESC, vp.variacion_diaria DESC
    """,
    
    "Variación mensual de pedidos por productos": """
        WITH pedidos_mensuales AS (
                SELECT hv.id_producto, 
                       TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY') AS mes, 
                       DATE_TRUNC('month', hv.fecha) AS fecha_truncada,
                       COUNT(*) AS total_pedidos
                FROM hechos_ventas hv
                GROUP BY hv.id_producto, DATE_TRUNC('month', hv.fecha)
            ),
            variacion_pedidos AS (
                SELECT pm.id_producto,
                       pm.mes,
                       pm.fecha_truncada,
                       pm.total_pedidos,
                       LAG(pm.total_pedidos) OVER (PARTITION BY pm.id_producto ORDER BY pm.fecha_truncada) AS pedidos_mes_anterior,
                       (pm.total_pedidos - LAG(pm.total_pedidos) OVER (PARTITION BY pm.id_producto ORDER BY pm.fecha_truncada)) AS variacion_mensual
                FROM pedidos_mensuales pm
            )
        SELECT ROW_NUMBER() OVER (ORDER BY vp.mes) AS id,
                dp.nombre AS nombre_producto, 
               vp.mes, 
               vp.total_pedidos, 
               vp.pedidos_mes_anterior, 
               vp.variacion_mensual
        FROM variacion_pedidos vp
        JOIN dim_productos dp ON vp.id_producto = dp.id_producto
        ORDER BY vp.fecha_truncada DESC, vp.variacion_mensual DESC
    """,
    
    "Variación mensual de pedidos por corresponsal": """
        WITH pedidos_mensuales AS (
                SELECT hv.id_corresponsal, 
                       TO_CHAR(DATE_TRUNC('month', hv.fecha), 'FMMonth YYYY') AS mes, 
                       DATE_TRUNC('month', hv.fecha) AS fecha_truncada,
                       COUNT(*) AS total_pedidos
                FROM hechos_ventas hv
                GROUP BY hv.id_corresponsal, DATE_TRUNC('month', hv.fecha)
            ),
            variacion_pedidos AS (
                SELECT pm.id_corresponsal,
                       pm.mes,
                       pm.fecha_truncada,
                       pm.total_pedidos,
                       LAG(pm.total_pedidos) OVER (PARTITION BY pm.id_corresponsal ORDER BY pm.fecha_truncada) AS pedidos_mes_anterior,
                       (pm.total_pedidos - LAG(pm.total_pedidos) OVER (PARTITION BY pm.id_corresponsal ORDER BY pm.fecha_truncada)) AS variacion_mensual
                FROM pedidos_mensuales pm
            )
        SELECT ROW_NUMBER() OVER (ORDER BY vp.mes) AS id,
                dc.nombre AS nombre_corresponsal, 
               vp.mes, 
               vp.total_pedidos, 
               vp.pedidos_mes_anterior, 
               vp.variacion_mensual
        FROM variacion_pedidos vp
        JOIN dim_corresponsales dc ON vp.id_corresponsal = dc.id_corresponsal
        ORDER BY vp.fecha_truncada DESC, vp.variacion_mensual DESC
    """
}

print("resultado")

# Inicializar Spark
spark = get_spark_session()

# Ejecutar las consultas con Spark JDBC
dataframes = {}
for titulo, query in queries.items():
    df = execute_query(spark, query)
    dataframes[titulo] = df
    #print(f"Resultado para '{titulo}':")
    #df.show()


pagina_actual = 1  # Página inicial
registros_por_pagina = 10  # Número de registros por página

tabla_output = widgets.Output()
grafico_output = widgets.Output()
botones_output = widgets.Output()
dropdown_reportes = widgets.Dropdown(options=list(dataframes.keys()), description="Reporte:")
boton_anterior = widgets.Button(description="⏪ Anterior")
boton_siguiente = widgets.Button(description="⏩ Siguiente")


def mostrar_grafico():
    reporte_seleccionado = dropdown_reportes.value
    spark_df = dataframes[reporte_seleccionado]  # Spark DataFrame
    
    # Convertir Spark DataFrame a Pandas
    pandas_df = spark_df.toPandas()

    grafico_output.clear_output()
    with grafico_output:
        plt.figure(figsize=(8, 4))
        sns.barplot(x=pandas_df.iloc[:, 0], y=pandas_df.iloc[:, 1], hue=pandas_df.iloc[:, 0], palette="Blues_r", legend=False)
        plt.title(reporte_seleccionado)
        plt.xlabel(pandas_df.columns[0])
        plt.ylabel(pandas_df.columns[1])
        plt.xticks(rotation=45)
        plt.show()





def ajustar_decimales(df, escala=2):
    for col_name, dtype in df.dtypes:
        match = re.match(r"decimal\((\d+),(\d+)\)", dtype)
        if match:
            precision = int(match.group(1))
            nueva_precision = max(precision, escala + 2)
            
            df = df.withColumn(col_name, col(col_name).cast(f"decimal({nueva_precision},{escala})"))

    return df



def mostrar_pagina(event=None):
    global pagina_actual
    reporte_seleccionado = dropdown_reportes.value
    df = dataframes[reporte_seleccionado]

    #print("Columnas en el DataFrame:", df.columns)

    #print("Columnas en el DataFrame:", df.collect())

    
    if event is not None:
        pagina_actual = 1


    #window_spec = Window.orderBy("nombre")
    
    #window_spec = Window.orderBy("nombre")
    #df = df.withColumn("id", row_number().over(window_spec))

    
    total_paginas = (df.count() // registros_por_pagina) + (1 if df.count() % registros_por_pagina > 0 else 0)

    inicio = (pagina_actual - 1) * registros_por_pagina
    fin = inicio + registros_por_pagina

    #df_paginated = df.limit(fin).filter(df.index >= inicio)

    #df_paginated = df.filter((col("id") > inicio) & (col("id") <= fin))
    

    #print("Columnas en el DataFrame:", df.collect())
    
    #window_spec = Window.orderBy("nombre")
    #df = df.withColumn("row_number", row_number().over(window_spec))
    
    # Filtrar por rango de registros

    df = ajustar_decimales(df)

        

    df_paginated = df.select("*")

    if(reporte_seleccionado == "Total de pagos diario por productos"):
        df_paginated = df.filter((df.id > inicio) & (df.id <= fin))

    #df_paginated.index = range(inicio + 1, inicio + 1 + len(df_paginated))

    # Limpiar y mostrar nueva salida
    tabla_output.clear_output()
    with tabla_output:
        print(f"\n📌 {reporte_seleccionado} - Página {pagina_actual} de {total_paginas}\n")
        display(df_paginated.show())

    
    

    
    botones_output.clear_output()
    with botones_output:
        if df.count() > 10:
            display(widgets.HBox([boton_anterior, boton_siguiente]))

    
    grafico_output.clear_output()
    with grafico_output:
        if df.count() <= 10:
            mostrar_grafico()
            #display(grafico_output)




def actualizar_pagina(event):
    global pagina_actual
    df = dataframes[dropdown_reportes.value]
    total_paginas = (df.count() // registros_por_pagina) + (1 if df.count() % registros_por_pagina > 0 else 0)

    if event.description == "⏪ Anterior" and pagina_actual > 1:
        pagina_actual -= 1
    elif event.description == "⏩ Siguiente" and pagina_actual < total_paginas:
        pagina_actual += 1

    mostrar_pagina()




# 📌 Asignar eventos
dropdown_reportes.observe(mostrar_pagina, names="value")
boton_anterior.on_click(actualizar_pagina)
boton_siguiente.on_click(actualizar_pagina)


pagina_selector = widgets.IntSlider(value=1, min=1, max=10, step=1, description="Página")

#widgets.interactive(mostrar_pagina, pagina=pagina_selector)
#display(pagina_selector)

# Inicializar la primera visualización
mostrar_pagina(pagina_actual)
display(dropdown_reportes)
display(tabla_output)
display(botones_output)
display(grafico_output)


Generando reportes de Business Intelligence basados en el Data Warehouse.
resultado


Dropdown(description='Reporte:', options=('Top 10 Productos más vendidos', 'Top 5 Clientes con más pedidos', '…

Output()

Output()

Output()