In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from delta.tables import DeltaTable
from pyspark.sql import Window

SILVER_TABLE = "workspace.javier_altoe.propiedades_silver"

# === Leer Bronze ===
bronze_df = spark.table("workspace.javier_altoe.propiedades_bronze")

# ====== Limpieza base ======
df_clean = (
    bronze_df
    .dropna(subset=["id"])
    .dropDuplicates(["id"])
)

# Trims en strings
string_cols = [f.name for f in df_clean.schema.fields if isinstance(f.dataType, StringType)]
for c in string_cols:
    df_clean = df_clean.withColumn(c, F.trim(F.col(c)))

# Ubicación/calle
df_clean = df_clean.withColumn("ubicacion", F.lower(F.col("ubicacion")))
df_clean = df_clean.withColumn("calle", F.initcap(F.col("calle")))

# ID → BIGINT (solo dígitos)
df_clean = df_clean.withColumn("id", F.expr("cast(nullif(regexp_replace(id, '[^0-9]', ''), '') as bigint)"))
df_clean = df_clean.filter(F.col("id").isNotNull())

# ====== Limpieza robusta para DOUBLEs ======
def clean_double(colname: str):
    return F.expr(f"""
        cast(
          case
            when {colname} is null then null
            when lower({colname}) = 'nan' then null
            when {colname} rlike '[A-Za-z]' then null
            else nullif(regexp_replace({colname}, '[^0-9\\.]', ''), '')
          end as double
        )
    """)

# Precio
df_clean = df_clean.withColumn("precio", clean_double("precio"))
df_clean = df_clean.withColumn("precio", F.when(F.col("precio").isNull() | F.isnan("precio"), -1.0).otherwise(F.col("precio")))

# Expensas
df_clean = df_clean.withColumn("expensas", clean_double("expensas"))
df_clean = df_clean.withColumn("expensas", F.when(F.col("expensas").isNull() | F.isnan("expensas"), -1.0).otherwise(F.col("expensas")))

# Metros cuadrados totales
df_clean = df_clean.withColumn("metros_cuadrados_totales", clean_double("metros_cuadrados_totales"))
df_clean = df_clean.withColumn("metros_cuadrados_totales", F.when(F.col("metros_cuadrados_totales").isNull() | F.isnan("metros_cuadrados_totales"), -1.0).otherwise(F.col("metros_cuadrados_totales")))

# Metros cuadrados cubiertos
df_clean = df_clean.withColumn("metros_cuadrados_cubiertos", clean_double("metros_cuadrados_cubiertos"))
df_clean = df_clean.withColumn("metros_cuadrados_cubiertos", F.when(F.col("metros_cuadrados_cubiertos").isNull() | F.isnan("metros_cuadrados_cubiertos"), -1.0).otherwise(F.col("metros_cuadrados_cubiertos")))

# ====== INTs ======
def clean_int(colname: str):
    return F.expr(f"""
        cast(
          case
            when {colname} is null then null
            when lower({colname}) = 'nan' then null
            when {colname} rlike '[A-Za-z]' then null
            else nullif(regexp_replace({colname}, '[^0-9-]', ''), '')
          end as int
        )
    """)

# Ambientes
df_clean = df_clean.withColumn("ambientes", clean_int("ambientes"))
df_clean = df_clean.withColumn("ambientes", F.when(F.col("ambientes").isNull(), -1).otherwise(F.col("ambientes")))

# Numero → extraer primer número
df_clean = df_clean.withColumn("numero", F.regexp_extract(F.col("numero"), r'(\d+)', 1))
df_clean = df_clean.withColumn("numero", F.when(F.col("numero") == "", None).otherwise(F.col("numero").cast("int")))
df_clean = df_clean.withColumn("numero", F.when(F.col("numero").isNull(), -1).otherwise(F.col("numero")))

# Piso
df_clean = df_clean.withColumn("piso", clean_int("piso"))
df_clean = df_clean.withColumn("piso", F.when(F.col("piso").isNull(), 0).otherwise(F.col("piso")))

# Antigüedad
df_clean = df_clean.withColumn("antiguedad", clean_int("antiguedad"))
df_clean = df_clean.withColumn("antiguedad", F.when(F.col("antiguedad").isNull(), -1).otherwise(F.col("antiguedad")))

# ====== Categóricos ======
df_clean = df_clean.withColumn(
    "orientacion_cardinal",
    F.when(F.col("orientacion_cardinal").isNull() | (F.lower("orientacion_cardinal")== "none"), "no tiene")
     .otherwise(F.col("orientacion_cardinal"))
)
df_clean = df_clean.withColumn(
    "orientacion_inmueble",
    F.when(F.col("orientacion_inmueble").isNull() | (F.lower("orientacion_inmueble")== "none"), "no tiene")
     .otherwise(F.col("orientacion_inmueble"))
)
df_clean = df_clean.withColumn("cochera",       F.when(F.col("cochera").isNull(), "-1").otherwise(F.col("cochera")))
df_clean = df_clean.withColumn("estado",        F.when(F.col("estado").isNull(), "-1").otherwise(F.col("estado")))
df_clean = df_clean.withColumn("tipo_vendedor", F.when(F.col("tipo_vendedor").isNull(), "-1").otherwise(F.col("tipo_vendedor")))

# Normalizar categorías
df_clean = df_clean.withColumn(
    "moneda",
    F.when(F.lower("moneda").like("%peso%"), "ARS")
     .when(F.lower("moneda").like("%dolar%"), "USD")
     .otherwise("desconocido")
)
df_clean = df_clean.withColumn(
    "tipo_de_operacion",
    F.when(F.lower("tipo_de_operacion").like("%alquiler%"), "alquiler")
     .when(F.lower("tipo_de_operacion").like("%venta%"), "venta")
     .otherwise("otro")
)

# Fecha → timestamp
df_clean = df_clean.withColumn("fecha", F.expr("try_to_timestamp(fecha, 'yyyy-MM-dd HH:mm:ss')"))

# ====== Me quedo con la última fila por URL según fecha ======
window = Window.partitionBy("url").orderBy(F.col("fecha").desc())
df_clean = (
    df_clean.withColumn("row_num", F.row_number().over(window))
            .filter(F.col("row_num") == 1)
            .drop("row_num")
)

# Orden de columnas
ordered_cols = [
    "id","ubicacion","numero","calle","precio","expensas","tipo_de_operacion","moneda",
    "ambientes","metros_cuadrados_totales","metros_cuadrados_cubiertos","orientacion_cardinal",
    "orientacion_inmueble","piso","cochera","antiguedad","estado","tipo_vendedor",
    "original_text","url","zona","fecha"
]
df_clean = df_clean.select(ordered_cols)

# ====== MERGE por URL ======
if spark.catalog.tableExists(SILVER_TABLE):
    silver_delta = DeltaTable.forName(spark, SILVER_TABLE)
    (
        silver_delta.alias("silver")
        .merge(
            df_clean.alias("updates"),
            "silver.url = updates.url and silver.precio = updates.precio"           
        )
        .whenMatchedUpdate(set={
            "expensas": "updates.expensas",
            "fecha":    "updates.fecha"
            # Agregá más campos si querés actualizarlos en caso de match.
        })
        .whenNotMatchedInsertAll()
        .execute()
    )
    print("🔄 Merge ejecutado: Silver actualizado")
else:
    df_clean.write.format("delta").mode("overwrite").saveAsTable(SILVER_TABLE)
    print("🆕 Tabla Silver creada:", SILVER_TABLE)


In [0]:
%sql
select count(1)
from workspace.javier_altoe.propiedades_silver;

/*
delete from workspace.javier_altoe.propiedades_silver
where id in (
'282109',
'251636',
'238405',
'124777',
'155976')*/