## 1. Generar los esquemas en formato medallón (bronce, plata y oro) para comenzar a guardar la data proveniente del Storage Account de Azure (Gen2). 💼🧳💪


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS bronce;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;

## 2. Guardar la Shared Access Signature (SAS) como secreto usando Databricks Secrets para una conexión segura al Storage. 🔑🔐

In [0]:
# Verificamos que los secretos existen previamente.

display(dbutils.secrets.listScopes())
display(dbutils.secrets.list("databricks-course-secret-scope"))

name
databricks-course-secret-scope


key
databricks-course-sas-token


In [0]:
dbc_sas_token = dbutils.secrets.get(scope="databricks-course-secret-scope", key="databricks-course-sas-token")
print(dbc_sas_token)

[REDACTED]


In [0]:
spark.conf.set("fs.azure.account.auth.type.sadatabrickscourse001.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.sadatabrickscourse001.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.sadatabrickscourse001.dfs.core.windows.net", dbc_sas_token)

In [0]:
#Comprobamos que tenemos acceso
display(dbutils.fs.ls("abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net"))

path,name,size,modificationTime
abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/categorias/,categorias/,0,1750552959000
abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/,clientes/,0,1750552924000
abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/productos/,productos/,0,1750552935000
abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/ventas/,ventas/,0,1750552944000


In [0]:
display(dbutils.fs.ls("abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/"))

path,name,size,modificationTime
abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes.csv,clientes.csv,4486,1750553065000
abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,clientes_5.csv,3112,1756910909000


## 3. Realizar la ingesta de los archivos CSV usando Auto Loader: ☁️☁️
- Utilizar el formato 'cloudFiles'.
- Habilitar la evolución del esquema (schema evolution) en lectura y escritura.
- Guardar los datos en formato Delta dentro de un esquema llamado 'bronce'.
- Utilizamos metadata.
- Agregamos columnas para auditoría.

🚨🚨🚨 IMPORTANTE 🚨🚨🚨

Utilizamos "awaitTermination" para asegurarnos que todo se cargue correctamente. 


In [0]:
# Cargamos las librerías necesarias

from pyspark.sql.functions import col, schema_of_json, from_json, lit, when, current_timestamp, input_file_name, expr, max, row_number, desc
from pyspark.sql import DataFrame
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StringType, NumericType
from delta.tables import DeltaTable
import json, uuid, time


In [0]:
# Guardamos los nombres del storage y del contenedor desde Azure Storage Gen2.
container = "container-course-databricks-001"
storage = "sadatabrickscourse001"

# Generamos la función para ingestar los datos.
def ingestar_auto_loader(location, table_name):
  path_location_azure = f"abfss://{container}@{storage}.dfs.core.windows.net/{location}"
  schema_location = f"/Filestore/course/databricks/advanced/autoloader/schema/datapath/data/{table_name}"
  checkpoint_location = f"/Filestore/course/databricks/advanced/autoloader/checkpoint/datapath/dataclass/{table_name}"

# Leemos los datos en modo streaming con rescue.
  df_streaming = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", schema_location)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(path_location_azure)
  )

# Enriquecemos con metadata
  df_enriched = (df_streaming
    .withColumn("ingestion_ts", current_timestamp()) # Marca temporal de ingesta
    .withColumn("ingestion_id", expr("uuid()")) # ID único de ingesta
    .withColumn("source_file", col("_metadata.file_name")) # Nombre del archivo
    .withColumn("source_path", col("_metadata.file_path")) # Ruta completa
    .withColumn("file_modification_time", col("_metadata.file_modification_time")) # Modificación del archivo en el storage
    .withColumn("file_size", col("_metadata.file_size")) # Tamaño del archivo
  )

# Guardamos en bronze
  (df_enriched 
      .writeStream
      .trigger(availableNow=True)
      .option("mergeSchema", "true")
      .option("checkpointLocation", checkpoint_location)
      .toTable(f"hive_metastore.bronce.{table_name}")
      .awaitTermination()
  )

##4. Limpiar duplicados y valores nulos, y mover los datos al esquema 'plata'. 🛠️⚡
- Detectamos cambios con la columna rescue_data.
- Inferimos esquema.
- Utilizamos funciones para simplificar la lectura.
- Guardamos Delta.

In [0]:
def replace_nulls(df):

    # Detectamos columnas string y numéricas.
    str_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
    num_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, NumericType)]

    # Reemplazamos nulos por "NA" o 0 según tipo de dato
    return df.fillna("NA", subset=str_cols).fillna(0, subset=num_cols)
    

In [0]:
location = dbutils.widgets.get("location")
table_name = dbutils.widgets.get("table_name")

def limpiar_y_guardar_silver(table_name):

    # Cargamos la tabla desde bronce.
    df_bronce = spark.table(f"hive_metastore.bronce.{table_name}")

    # Eliminamos duplicados y nulos.
    id_col = df_bronce.columns[0]
    df_bronce = df_bronce.filter(col(id_col).isNotNull())

    window = Window.partitionBy(id_col).orderBy(desc("ingestion_ts"))

    df_bronce = (
        df_bronce
        .withColumn("rn", row_number().over(window))
        .filter(col("rn") == 1)
        .drop("rn")
    )

    df_bronce = replace_nulls(df_bronce)
    
    has_rescued_data = "_rescued_data" in df_bronce.columns
    
    # Buscamos rescues del último batch, no de todo el histórico.
    if has_rescued_data:

        last_ts = df_bronce.agg(max("ingestion_ts")).collect()[0][0]

        # Filtramos solo ese batch y con rescued_data válido.
        df_rescued = (df_bronce
            .filter(col("ingestion_ts") == last_ts)
            .filter((col("_rescued_data").isNotNull()) & (col("_rescued_data") != "NA"))
        )

        rescued_non_empty = df_rescued.limit(1).count() > 0 
    else:
        rescued_non_empty = False

    # Si la columna "_rescued_data" contiene datos, intentamos parsearla.
    if has_rescued_data and rescued_non_empty:

        # Recorremos los rescue
        rescued_rows = df_rescued.limit(50).collect()

        for row in rescued_rows:
            rescued_json_str = row["_rescued_data"]

            # Inferimos el schema a partir de cada JSON nuevo.
            schema_col = schema_of_json(lit(rescued_json_str))
            schema_str = spark.range(1).select(schema_col.alias("schema_json")).collect()[0]["schema_json"]
            inferred_schema = StructType.fromDDL(schema_str)

            # Parseamos y extraemos campos nuevos.
            df_bronce = df_bronce.withColumn("rescued_json", from_json(col("_rescued_data"), inferred_schema))
            for field in inferred_schema.fieldNames():
                if field not in df_bronce.columns:
                    df_bronce = df_bronce.withColumn(field, col("rescued_json").getField(field))

        # Limpiamos columnas auxiliares.
        cols_to_drop = [c for c in ["_rescued_data", "rescued_json", "_file_path"] if c in df_bronce.columns]
        df_silver = df_bronce.drop(*cols_to_drop).dropDuplicates()

    else:
        
        df_silver = df_bronce.drop("_rescued_data").dropDuplicates()

    df_silver = replace_nulls(df_silver)

    # Ruta y nombre de la tabla destino.
    path_silver = f"dbfs:/user/hive/warehouse/silver.db/{table_name}"
    full_table_name = f"hive_metastore.silver.{table_name}"

    # Guardamos como Delta Table (creamos o actualizamos).
    if not DeltaTable.isDeltaTable(spark, path_silver):
        df_silver.write.format("delta").saveAsTable(full_table_name)
    else:

        # Habilitamos la evolución del esquema.
        spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

        delta_tbl = DeltaTable.forPath(spark, path_silver)
        id_col = df_silver.columns[0]
        delta_tbl.alias("t").merge(df_silver.alias("s"), f"t.{id_col} = s.{id_col}") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

# Ejecutamos ingesta y limpieza.
ingestar_auto_loader(location, table_name)
limpiar_y_guardar_silver(table_name)

cliente_id,nombre,correo,ciudad,_rescued_data,ingestion_ts,ingestion_id,source_file,source_path,file_modification_time,file_size
100,Juan Gomez,cliente100@correo.com,Lima,"{""continente"":""Asia"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,f291160e-d96c-4630-b7c9-f490ad74a0f8,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
43,Maria Gomez,cliente43@correo.com,Mendoza,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,2a70cc1b-9f41-4cb5-aef9-76c64bfa4cd9,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
44,Juan Gomez,cliente44@correo.com,Barranquilla,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,9c5284ec-7e84-4266-889a-6c4de7aeb7cf,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
45,Maria Perez,cliente45@correo.com,Lima,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,8d5185d0-0ddc-42e9-ab39-f4dc98461c6e,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
46,Juan Gomez,cliente46@correo.com,Merida,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,d4b23198-cd86-4416-846f-d702e865dc62,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
47,Maria Gomez,cliente47@correo.com,Medellin,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,9397a7f0-4ece-4b3d-86d9-d50011f931fd,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
48,Juan Perez,cliente48@correo.com,Mendoza,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,28b6b6b9-2ce3-4df8-97e7-4f442765cb66,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
49,Maria Gomez,cliente49@correo.com,Barranquilla,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,e6d92430-bab1-4a05-bea8-22c5fab29ef2,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
50,Juan Gomez,cliente50@correo.com,Lima,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,07f1ba65-bcf0-4933-b85a-c2b8abedf8fd,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
51,Maria Perez,cliente51@correo.com,Merida,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,1055aeae-3fc5-464e-b467-2a9071ccfec9,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112


cliente_id,nombre,correo,ciudad,ingestion_ts,ingestion_id,source_file,source_path,file_modification_time,file_size,continente
47,Maria Gomez,cliente47@correo.com,Medellín,2025-09-02T19:32:29.774Z,90d8ec08-4f47-424c-adb7-b1332b79c5f6,clientes.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes.csv,2025-06-22T00:44:25Z,4486,
30,Juan Perez,cliente30@correo.com,Cartagena,2025-09-02T19:32:29.774Z,bbaf8e34-c48f-4e19-a065-e2b6bf526489,clientes.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes.csv,2025-06-22T00:44:25Z,4486,
67,Maria Gomez,cliente67@correo.com,Medellin,2025-09-03T15:09:26.995Z,8c820ff9-ad23-4387-b569-b10e6530d0d1,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112,America
26,Juan Gomez,cliente26@correo.com,Bogotá,2025-09-02T19:32:29.774Z,f095a4c7-1a56-40e6-81d5-fd11e0f8871b,clientes.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes.csv,2025-06-22T00:44:25Z,4486,
96,Juan Perez,cliente96@correo.com,Merida,2025-09-03T15:09:26.995Z,5482249f-9a64-4a52-883e-928fea4f2079,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112,America
15,Maria Perez,cliente15@correo.com,Cartagena,2025-09-02T19:32:29.774Z,3a5db812-8e37-4aa3-b78d-df0df16fc14b,clientes.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes.csv,2025-06-22T00:44:25Z,4486,
60,Juan Perez,cliente60@correo.com,Lima,2025-09-03T15:09:26.995Z,4dac6e6f-e20f-442c-a1e4-2a85ce294522,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112,America
38,Juan Gomez,cliente38@correo.com,Cali,2025-09-02T19:32:29.774Z,9b338013-69e7-4d37-bcf5-679c114e849a,clientes.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes.csv,2025-06-22T00:44:25Z,4486,
67,Maria Gomez,cliente67@correo.com,Medellín,2025-09-02T19:32:29.774Z,5a7380c4-ac6b-4927-a2ea-07ad5b8f2aac,clientes.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes.csv,2025-06-22T00:44:25Z,4486,
87,Maria Perez,cliente87@correo.com,Medellín,2025-09-02T19:32:29.774Z,ee08474d-79ca-46c1-9ca2-bab216bb377e,clientes.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes.csv,2025-06-22T00:44:25Z,4486,


cliente_id,nombre,correo,ciudad,_rescued_data,ingestion_ts,ingestion_id,source_file,source_path,file_modification_time,file_size
100,Juan Gomez,cliente100@correo.com,Lima,"{""continente"":""Asia"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,f291160e-d96c-4630-b7c9-f490ad74a0f8,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
43,Maria Gomez,cliente43@correo.com,Mendoza,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,2a70cc1b-9f41-4cb5-aef9-76c64bfa4cd9,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
44,Juan Gomez,cliente44@correo.com,Barranquilla,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,9c5284ec-7e84-4266-889a-6c4de7aeb7cf,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
45,Maria Perez,cliente45@correo.com,Lima,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,8d5185d0-0ddc-42e9-ab39-f4dc98461c6e,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
46,Juan Gomez,cliente46@correo.com,Merida,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,d4b23198-cd86-4416-846f-d702e865dc62,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
47,Maria Gomez,cliente47@correo.com,Medellin,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,9397a7f0-4ece-4b3d-86d9-d50011f931fd,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
48,Juan Perez,cliente48@correo.com,Mendoza,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,28b6b6b9-2ce3-4df8-97e7-4f442765cb66,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
49,Maria Gomez,cliente49@correo.com,Barranquilla,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,e6d92430-bab1-4a05-bea8-22c5fab29ef2,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
50,Juan Gomez,cliente50@correo.com,Lima,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,07f1ba65-bcf0-4933-b85a-c2b8abedf8fd,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112
51,Maria Perez,cliente51@correo.com,Merida,"{""continente"":""America"",""_file_path"":""abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv""}",2025-09-03T15:09:26.995Z,1055aeae-3fc5-464e-b467-2a9071ccfec9,clientes_5.csv,abfss://container-course-databricks-001@sadatabrickscourse001.dfs.core.windows.net/clientes/clientes_5.csv,2025-09-03T14:48:29Z,3112


[0;31m---------------------------------------------------------------------------[0m
[0;31mUnsupportedOperationException[0m             Traceback (most recent call last)
File [0;32m<command-5670470672346535>, line 89[0m
[1;32m     87[0m [38;5;66;03m# Ejecutamos ingesta y limpieza.[39;00m
[1;32m     88[0m ingestar_auto_loader(location, table_name)
[0;32m---> 89[0m limpiar_y_guardar_silver(table_name)

File [0;32m<command-5670470672346535>, line 85[0m, in [0;36mlimpiar_y_guardar_silver[0;34m(table_name)[0m
[1;32m     80[0m delta_tbl [38;5;241m=[39m DeltaTable[38;5;241m.[39mforPath(spark, path_silver)
[1;32m     81[0m id_col [38;5;241m=[39m df_silver[38;5;241m.[39mcolumns[[38;5;241m0[39m]
[1;32m     82[0m delta_tbl[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mt[39m[38;5;124m"[39m)[38;5;241m.[39mmerge(df_silver[38;5;241m.[39malias([38;5;124m"[39m[38;5;124ms[39m[38;5;124m"[39m), [38;5;124mf[39m[38;5;124m"[39m[38;5;124mt.[39m[38;5;1

##5. Crear una tabla final consolidada en el esquema 'oro' que combine: 🔚
- Ventas
- Información de clientes
- Detalles de productos
- Categorías

In [0]:
# Generamos una función para eliminar columnas de auditoria.

AUDIT_COLS = [
    "ingestion_ts",
    "ingestion_id",
    "source_file",
    "source_path",
    "file_modification_time",
    "file_size"
]

def drop_audit_cols(df):
    cols_to_drop = [c for c in AUDIT_COLS if c in df.columns]
    return df.drop(*cols_to_drop) if cols_to_drop else df


[0;31m---------------------------------------------------------------------------[0m
[0;31mUnsupportedOperationException[0m             Traceback (most recent call last)
File [0;32m<command-5670470672346535>, line 89[0m
[1;32m     87[0m [38;5;66;03m# Ejecutamos ingesta y limpieza.[39;00m
[1;32m     88[0m ingestar_auto_loader(location, table_name)
[0;32m---> 89[0m limpiar_y_guardar_silver(table_name)

File [0;32m<command-5670470672346535>, line 85[0m, in [0;36mlimpiar_y_guardar_silver[0;34m(table_name)[0m
[1;32m     80[0m delta_tbl [38;5;241m=[39m DeltaTable[38;5;241m.[39mforPath(spark, path_silver)
[1;32m     81[0m id_col [38;5;241m=[39m df_silver[38;5;241m.[39mcolumns[[38;5;241m0[39m]
[1;32m     82[0m delta_tbl[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mt[39m[38;5;124m"[39m)[38;5;241m.[39mmerge(df_silver[38;5;241m.[39malias([38;5;124m"[39m[38;5;124ms[39m[38;5;124m"[39m), [38;5;124mf[39m[38;5;124m"[39m[38;5;124mt.[39m[38;5;1

In [0]:
tables = ["clientes_autoloader", "productos_autoloader", "categorias_autoloader", "ventas_autoloader"]

if all(spark.catalog.tableExists(f"hive_metastore.silver.{t}") for t in tables):

    clientes = spark.table("hive_metastore.silver.clientes_autoloader")
    productos = spark.table("hive_metastore.silver.productos_autoloader")
    categorias = spark.table("hive_metastore.silver.categorias_autoloader")
    ventas = spark.table("hive_metastore.silver.ventas_autoloader")

    # Realizamos los joins entre hechos y dimensiones eliminando los campos de auditoria.
    df_gold = drop_audit_cols(ventas) \
        .join(drop_audit_cols(clientes), on="cliente_id", how="inner") \
        .join(drop_audit_cols(productos), on="producto_id", how="inner") \
        .join(drop_audit_cols(categorias), on="categoria_id", how="inner") \

    df_gold = df_gold.dropDuplicates()

    gold_table_name = "hive_metastore.gold.tabla_final_consolidada"
    gold_path_name = "dbfs:/user/hive/warehouse/gold.db/tabla_final_consolidada"

    if not DeltaTable.isDeltaTable(spark, gold_path_name):
        df_gold.write.format("delta").saveAsTable(gold_table_name)
        print("Tabla oro creada correctamente.")
    else:
        delta_oro = DeltaTable.forPath(spark, gold_path_name)
        delta_oro.alias("t").merge(df_gold.alias("s"), "t.venta_id = s.venta_id") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

        print("Tabla oro actualizada con nuevos datos o cambios.")

else:
    print("No están disponibles todas las tablas necesarias.")

[0;31m---------------------------------------------------------------------------[0m
[0;31mUnsupportedOperationException[0m             Traceback (most recent call last)
File [0;32m<command-5670470672346535>, line 89[0m
[1;32m     87[0m [38;5;66;03m# Ejecutamos ingesta y limpieza.[39;00m
[1;32m     88[0m ingestar_auto_loader(location, table_name)
[0;32m---> 89[0m limpiar_y_guardar_silver(table_name)

File [0;32m<command-5670470672346535>, line 85[0m, in [0;36mlimpiar_y_guardar_silver[0;34m(table_name)[0m
[1;32m     80[0m delta_tbl [38;5;241m=[39m DeltaTable[38;5;241m.[39mforPath(spark, path_silver)
[1;32m     81[0m id_col [38;5;241m=[39m df_silver[38;5;241m.[39mcolumns[[38;5;241m0[39m]
[1;32m     82[0m delta_tbl[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mt[39m[38;5;124m"[39m)[38;5;241m.[39mmerge(df_silver[38;5;241m.[39malias([38;5;124m"[39m[38;5;124ms[39m[38;5;124m"[39m), [38;5;124mf[39m[38;5;124m"[39m[38;5;124mt.[39m[38;5;1