In [4]:
# %% [markdown]
# # 2. Transformación (Capa Plata)
# Limpieza, tipado fuerte y Quality Gate con bifurcación

# %%
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, when, lit
from delta import *

# %%
# Spark Session con Delta
builder = (
    SparkSession.builder
    .appName("Lab_SECOP_Bronze")
    .master("spark://spark-master:7077")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.executor.memory", "1g")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("WARN")


In [8]:
BRONZE_PATH = "file:/app/data/lakehouse/bronze/secop"
SILVER_PATH = "file:/app/data/lakehouse/silver/secop"
QUAR_PATH   = "file:/app/data/lakehouse/quarantine/secop_errors"

# Leer bronze
df_bronze = spark.read.format("delta").load(BRONZE_PATH)


# %%
# Tipado de fecha (en Bronze la columna es: fecha_de_firma)
df_typed = (
    df_bronze
    .withColumn("fecha_firma", to_date(col("fecha_de_firma"), "yyyy-MM-dd"))
)

# %%
# Definición de reglas de calidad y motivo de rechazo
df_qc = df_typed.withColumn(
    "motivo_rechazo",
    when(col("precio_base").isNull(), lit("precio_base_nulo"))
    .when(col("precio_base") <= 0, lit("precio_base_no_positivo"))
    .when(col("fecha_firma").isNull(), lit("fecha_firma_nula"))
    .otherwise(lit(None))
)


In [9]:
# Split de datos
df_validos = df_qc.filter(col("motivo_rechazo").isNull())
df_invalidos = df_qc.filter(col("motivo_rechazo").isNotNull())

# %%
# Escribir capa Silver (solo registros válidos)
(df_validos.write
  .format("delta")
  .mode("overwrite")
  .save(SILVER_PATH)
)

# %%
# Escribir capa Quarantine (registros inválidos con motivo)
(df_invalidos.write
  .format("delta")
  .mode("overwrite")
  .save(QUAR_PATH)
)
# %%
print("✅ Capa Silver generada:", df_validos.count(), "registros")
print("⚠️ Registros en Quarantine:", df_invalidos.count())

                                                                                

✅ Capa Silver generada: 1000 registros
⚠️ Registros en Quarantine: 0


26/02/02 02:18:43 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
26/02/02 02:18:44 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:981)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce