In [0]:
spark.conf.set("spark.sql.ansi.enabled", "false")

In [0]:

from pyspark.sql.functions import sum, when, trim, to_date, col
from pyspark.sql.types import StructType, StructField, DateType, StringType, DoubleType, IntegerType


In [0]:
# Ingestación de datos
dbutils.widgets.text("catalog", "dev_ecommerce")
dbutils.widgets.text("storage_account","adlsproyecto2025")

In [0]:
catalog_param = dbutils.widgets.get("catalog")
storage_account = dbutils.widgets.get("storage_account")
# container y schema serán lo mismo
container = {"origen":"bronze","destino":"silver"}


In [0]:
spark.sql(f"USE CATALOG {catalog_param}")

In [0]:

df_bronze_init = spark.table(f"{catalog_param}.{container['origen']}.sales_orders")


In [0]:
# primer registro no está como cabecera
df_bronze_init.show(3)

In [0]:
# evidencias de rows incuidos el que debe ser la cabecera 1001, al realizar la transformacion debe quedar con 1000 rows.
df_bronze_init.count()

In [0]:
#Transformar el primer registro en nombres de la tabla
# -- obtener el primer registro
df_first_row = df_bronze_init.first()
# -- convertir el primer registro en nombres de columna
column_names = [str(cell) for cell in df_first_row]

# obtener todos los datos a excepcion de la 1era fila
df_bronze_init_no_fisrt_raw = df_bronze_init.subtract(df_bronze_init.limit(1))

df_bronze_fixed = df_bronze_init_no_fisrt_raw.toDF(*column_names)

# se evidencia que se tiene como se esparaba 1000 registros
df_bronze_fixed.count()

In [0]:
# com parte de la transforamcion se observa que todas las columnas estan con tipo de dato string
df_bronze_fixed.printSchema()

In [0]:
# Definimos el schema adecuado según los datos y previa confirmacion con el negocio

schema_orders_sales = StructType([
    StructField("Date", DateType(), True),
    StructField("Product_Category", StringType(), True),
    StructField("Price", DoubleType(), True),
    StructField("Discount", DoubleType(), True),
    StructField("Customer_Segment", StringType(), True),
    StructField("Marketing_Spend", DoubleType(), True),
    StructField("Units_Sold", IntegerType(), True)
])

In [0]:
# casteamos los campos con el schema definido

df_bronze_fixed_schema_correct =df_bronze_fixed.select([
    (to_date(col(c.name), "dd-MM-yyyy").alias("Date")
     if c.name == "Date" else col(c.name).cast(c.dataType).alias(c.name))
    for c in schema_orders_sales
])

# validamos el schema actualizado
df_bronze_fixed_schema_correct.printSchema()



In [0]:

from pyspark.sql.functions import col, when, expr

# validmas alguna reglas de negocio indicadas en el requerimiento:
# price no puede tener valores negativos ni cero
# Units_Sold no puede ser negativo y/o cero
# Discount no puede ser mayor a 100 soles 

df_invalid_rules = df_bronze_fixed_schema_correct.filter(
    (when(col("Price").isNotNull(), col("Price")).otherwise(0) <= 0) |
    (when(col("Units_Sold").isNotNull(), col("Units_Sold")).otherwise(0) < 0) |
    (when(col("Discount").isNotNull(), col("Discount")).otherwise(0) > 100)
)

# si devuelve vacio estamos bien
df_invalid_rules.show()

In [0]:
# se contruye la ruta destino

silver_path = f"abfss://{container['destino']}@{storage_account}.dfs.core.windows.net/orders_sales/"



In [0]:
# se procede a excribir los deltas
df_bronze_fixed_schema_correct.write.format("delta").mode("append").save(silver_path)

In [0]:
# registro de la tabla sales_ordes en el schema silver 

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {catalog_param}.{container['destino']}.sales_orders
    USING DELTA
    LOCATION '{silver_path}'
""")