# Objetivo del laboratorio
Aplicar técnicas de validación de estructura, manejo de errores y trazabilidad al cargar datos de compras en un supermercado desde archivos JSON, dividiendo registros válidos e inválidos, y asegurando la calidad de los datos antes de su escritura.

### 1. subir archivos 
/Volumes/dmc/default/landing/sesion_03/ en tu entorno.

### 2. Definir esquema esperado con StructType

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, TimestampType

compras_schema = (
    StructType()
    .add("compra_id", StringType())
    .add("cliente_id", StringType())
    .add("producto", StringType())
    .add("categoria", StringType())
    .add("cantidad", IntegerType())
    .add("precio_unitario", DoubleType())
    .add("fecha_compra", TimestampType())
)

### 3. Leer archivo JSON


In [0]:
from pyspark.sql.functions import col

df_compras = (
    spark.read
    .format("json")
    .schema(compras_schema)
    .load("/Volumes/dmc/default/source/sesion_03/")
    .withColumn("archivo_origen", col("_metadata.file_path"))
)

In [0]:
display(df_compras)

### 4. Leer archivo JSON y mapear inconsistencias

In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType, DoubleType, TimestampType

compras_schema = (
    StructType()
    .add("compra_id", StringType())
    .add("cliente_id", StringType())
    .add("producto", StringType())
    .add("categoria", StringType())
    .add("cantidad", IntegerType())
    .add("precio_unitario", DoubleType())
    .add("fecha_compra", TimestampType())
    .add("_rescued_data", StringType())
)

In [0]:
from pyspark.sql.functions import col

df_compras = (
    spark.read
    .format("json")
    .schema(compras_schema)
    .option("columnNameOfCorruptRecord", "_rescued_data")
    #.option("badRecordsPath", "/Volumes/dmc/default/source/sesion_03/bad_records/") # _rescued_data se almacena en un directorio
    .load("/Volumes/dmc/default/source/sesion_03/")
    .withColumn("archivo_origen", col("_metadata.file_path"))
)

In [0]:
display(df_compras)

### 5. try/except

In [0]:
from pyspark.sql.functions import col

try:
    df_validos = df_compras.filter(col("_rescued_data").isNull())
    df_invalidos = df_compras.filter(col("_rescued_data").isNotNull())

    df_validos.write.format("delta").mode("overwrite").saveAsTable("dmc.default.compras_validadas")
    df_invalidos.write.format("delta").mode("overwrite").saveAsTable("dmc.default.compras_con_errores")

    print("Escritura realizada con éxito.")

except Exception as e:
    print(f"Error durante la escritura: {str(e)}")

In [0]:
%sql

select * from dmc.default.compras_validadas

In [0]:
%sql

select * from dmc.default.compras_con_errores