In [0]:
dbutils.widgets.removeAll()

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from decimal import Decimal

In [0]:
dbutils.widgets.text("catalogo", "catalog_xbautisv")
dbutils.widgets.text("esquema_source", "bronze")
dbutils.widgets.text("esquema_sink", "silver")

In [0]:
catalogo = dbutils.widgets.get("catalogo")
esquema_source = dbutils.widgets.get("esquema_source")
esquema_sink = dbutils.widgets.get("esquema_sink")

In [0]:
df_clientes = spark.table(f"{catalogo}.{esquema_source}.clientes").select("id_cliente", "nombre_cliente", "segmento_cliente").withColumnRenamed("id_cliente","id_cliente_j") 
df_productos = spark.table(f"{catalogo}.{esquema_source}.productos").select("id_producto", "nombre_producto", "categoria", "subcategoria").withColumnRenamed("id_producto","id_producto_j")
df_compras = spark.table(f"{catalogo}.{esquema_source}.compras").select("id_fila", "id_pedido", "fecha_pedido", "fecha_envio", "metodo_envio", "id_cliente", "ciudad", "provincia", "pais", "region", "id_producto", "monto_ventas", "monto_cantidad", "monto_descuento").withColumnRenamed("id_fila","id_item")
df_devoluciones = spark.table(f"{catalogo}.{esquema_source}.devoluciones").select("id_pedido","pedido_devuelto").withColumnRenamed("id_pedido","id_pedido_j")
df_personas = spark.table(f"{catalogo}.{esquema_source}.personas").select("region","gerente_regional").withColumnRenamed("region","region_j")

In [0]:
df_orden_compra = (df_compras.alias("a").join(df_clientes.alias("b"), col("a.id_cliente") == col("b.id_cliente_j"), "left")
                                        .join(df_productos.alias("c"), col("a.id_producto") == col("c.id_producto_j"), "left")
                                        .join(df_devoluciones.alias("d"), col("a.id_pedido") == col("d.id_pedido_j"), "left")
                                        .join(df_personas.alias("e"), col("a.region") == col("e.region_j"), "left"))


In [0]:
def calcula_subtotal(subtotal, cantidad, descuento):
    return (subtotal - descuento) * Decimal(cantidad)

In [0]:
subtotal_udf = F.udf(calcula_subtotal, DecimalType(10,2))

In [0]:
df_orden_compra_final = df_orden_compra.withColumn("subtotal", subtotal_udf(col("monto_ventas"), col("monto_cantidad"), col("monto_descuento")))

In [0]:
window_spec = (
    Window
    .partitionBy("id_pedido","id_cliente")          # PARTITION BY
    .orderBy("id_item")               # ORDER BY
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

In [0]:
df_acumulado = df_orden_compra_final.withColumn("monto_acumulado",F.sum("subtotal").over(window_spec))

In [0]:
ordencompra_select_df = df_orden_compra_final.select(col("id_pedido"),
                                                    col("id_cliente"),
                                                    col("nombre_cliente"),
                                                    col("segmento_cliente"),
                                                    col("fecha_pedido"),
                                                    col("fecha_envio"),
                                                    col("metodo_envio"),
                                                    col("ciudad"),
                                                    col("provincia"),
                                                    col("pais"),
                                                    col("region"),
                                                    col("id_item"),
                                                    col("id_producto"),
                                                    col("nombre_producto"),
                                                    col("categoria"),
                                                    col("subcategoria"),
                                                    when(col("pedido_devuelto").isNull(), "No").otherwise(col("pedido_devuelto")).alias("pedido_devuelto"),
                                                    col("gerente_regional"),
                                                    col("monto_ventas"),
                                                    col("monto_cantidad"),
                                                    col("monto_descuento"),
                                                    col("subtotal"))

In [0]:
spark.sql(f"DROP TABLE IF EXISTS {catalogo}.{esquema_sink}.ordenes_compra")

In [0]:
ordencompra_select_df.write.mode("overwrite").saveAsTable(f"{catalogo}.{esquema_sink}.ordenes_compra")