1. Configuración del ETL

Este trabajo se realizará las siguientes actividades sin importar el formato del archivo lee csv .xlsx y json:

    Limpieza del archivo: Eliminar filas duplicadas y vacías.
    Validación: Verificar que el esquema del archivo sea consistente con la tabla destino.
    Transformación: Ajustar las columnas si es necesario.

In [0]:
import pandas as pd
import os
from pathlib import Path

#Cargamos la rutas de ingreso y salida
ruta_raw = "/dbfs/mnt/ingest_data/raw/"
ruta_processed = "/dbfs/mnt/ingest_data/processed/"

# Obtenemos la lista de archivos en la carpeta raw
archivos = [str(f) for f in Path(ruta_raw).glob("*") if f.is_file()]

# Procesamos los archivos en la carpeta raw
for ruta_archivo in archivos:
    extension = os.path.splitext(ruta_archivo)[1].lower()

    # Leemos el archivo según su formato
    if extension == ".csv":
        df = pd.read_csv(ruta_archivo)
    elif extension == ".json":
        df = pd.read_json(ruta_archivo)
    elif extension == ".xlsx":
        df = pd.read_excel(ruta_archivo, sheet_name=0)
    else:
        print(f"Formato no soportado: {ruta_archivo}")
        continue

    # Eliminamos duplicados
    df_sin_duplicados = df.drop_duplicates()

    # Eliminamos filas vacías basadas en columnas clave
    columnas_clave = ["id", "nombre"]  # Cambia según las columnas esperadas
    df_limpio = df_sin_duplicados.dropna(subset=[c for c in columnas_clave if c in df_sin_duplicados.columns])

    # 3. Validamos el esquema esperado
    esquema_esperado = ["id_servicios_restaurantes","servicios_restaurantes","direccion","id_condado,condado","codigo_postal_condado","latitud_condado","longitud_condado","id_ciudad","ciudad","codigo_postal_ciudad","latitud,longitud","estado,categorias","puntuacion_usuarios","analisis_sentimientos","url_del_negocio","enlaces_google_maps","anio"
]
    df_validado = df_limpio[[c for c in esquema_esperado if c in df_limpio.columns]]

    # Guardamos el archivo procesado en la carpeta `processed`
    nombre_archivo = os.path.basename(ruta_archivo)
    ruta_salida = f"{ruta_processed}etl_{os.path.splitext(nombre_archivo)[0]}.parquet"
    df_validado.to_parquet(ruta_salida, index=False)

    print(f"ETL completado para {ruta_archivo}. Datos procesados guardados en: {ruta_salida}")







ETL completado para /dbfs/mnt/ingest_data/raw/datos_de_prueba.csv. Datos procesados guardados en: /dbfs/mnt/ingest_data/processed/etl_datos_de_prueba.parquet
