In [None]:
from pyspark.sql.functions import col, to_date, when, lit

# %%
# Transformar (con Quality Gate)

df_transformed = df_bronze \
    .withColumnRenamed("Precio Base", "precio_base") \
    .withColumnRenamed("Departamento", "departamento") \
    .withColumnRenamed("Fecha de Firma", "fecha_firma_str") \
    .withColumn("fecha_firma", to_date(col("fecha_firma_str"), "yyyy-MM-dd"))

# ----------------------------
# Reglas de Calidad
# ----------------------------
df_quality = df_transformed.withColumn(
    "motivo_rechazo",
    when(col("precio_base") <= 0, lit("Precio base menor o igual a 0"))
    .when(col("fecha_firma").isNull(), lit("Fecha de firma nula"))
    .otherwise(lit(None))
)

# ----------------------------
# Bifurcación (Split)
# ----------------------------
df_silver = df_quality \
    .filter(col("motivo_rechazo").isNull()) \
    .select("Entidad", "departamento", "precio_base", "fecha_firma")

df_quarantine = df_quality \
    .filter(col("motivo_rechazo").isNotNull())

# ----------------------------
# Escritura Plata
# ----------------------------
df_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .save("data/lakehouse/silver/secop")

# ----------------------------
# Escritura Cuarentena
# ----------------------------
df_quarantine.write \
    .format("delta") \
    .mode("overwrite") \
    .save("data/lakehouse/quarantine/secop_errors")

print("Quality Gate ejecutado correctamente")
print("Registros válidos:", df_silver.count())
print("Registros en cuarentena:", df_quarantine.count())

df_silver.show(5)
df_quarantine.show(5)
