In [5]:
import os
from omegaconf import OmegaConf

#
config_path = "config/config.yaml"

if not os.path.exists(config_path):
    raise FileNotFoundError(f"no se encontro el file: {config_path}")

config = OmegaConf.load(config_path)
print(f"Archivo leido correctamente")


Archivo leido correctamente


In [None]:
from pyspark.sql.functions import col, when, lit, current_timestamp
from pyspark.sql.types import IntegerType, DecimalType
import pandas as pd
import shutil
import os


def get_data(spark, config):
    #salida fecha proceso 
    #(output_path: data/processed/${fecha_proceso})
    df = spark.read.option("header", True).option("inferSchema", True).csv(config.input.file_path)
    df = df.withColumn("fecha_proceso", col("fecha_proceso").cast("string"))
    
    return df.filter(
        (col("fecha_proceso") >= config.filters.start_date) & 
        (col("fecha_proceso") <= config.filters.end_date) & 
        (col("pais") == config.filters.country)
    )


def transform_data(df, config):

    df = df.dropDuplicates()

    df = df.withColumn(
        "cantidad_unidades",
        when(
            col("unidad") == "CS",
            col("cantidad") * config.processing.box_size #llamamos al parametro box_size del config.yaml que tiene como valor 20 
        ).when(
            col("unidad") == "ST",
            col("cantidad")
        ).otherwise(None) # Default null si no coincide ninguna unidad
    )


    routine_types = list(config.processing.delivery_types.routine)
    bonus_types = list(config.processing.delivery_types.bonus)
    all_types = routine_types + bonus_types


    df = df.filter(
        col("tipo_entrega").isin(all_types)
    )


    df = (
        df
        .withColumn(
            "entrega_rutina",
            col("tipo_entrega").isin(routine_types)
        )
        .withColumn(
            "entrega_bonificacion",
            col("tipo_entrega").isin(bonus_types)
        )
    )

    df = df.withColumn(
        "precio",
        when(col("precio") > 0, col("precio"))
        .cast(DecimalType(10, 2))
    )

    df = (
        df
        .dropna(subset=["cantidad_unidades"])
        .filter(col("cantidad_unidades") > 0)
    )

    return df

# Carga de datos
def load_data(df, config):
    #se estandarizaron los nombres de paises
    country_map = {
        "GT": "Guatemala", 
        "PE": "Peru", 
        "EC": "Ecuador", 
        "SV": "El Salvador", 
        "HN": "Honduras", 
        "JM": "Jamaica"
        }
    
    df = df.replace(country_map, subset=["pais"])
    
        #Estandar de nombres de columnas
    output_df = df.select(
        col("pais").alias("country"),
        col("material"),
        col("transporte").alias("transport"),
        col("ruta").alias("route"),
        col("precio").alias("price"),
        col("cantidad_unidades").cast(IntegerType()).alias("unit_quantity"),
        col("entrega_rutina").alias("routine_delivery"),
        col("entrega_bonificacion").alias("bonus_delivery"),
        col("fecha_proceso"), 
        current_timestamp().alias("load_date")
    )
    
    output_path = config.output.base_path
    if os.path.exists(output_path):
        shutil.rmtree(output_path)
    os.makedirs(output_path, exist_ok=True)
    

    
    pdf = output_df.toPandas() #se utilizo una conversión a Pandas para la ejecución  local en Windows sin dependencias de binarios de Hadoop (winutils.exe).
    
    pdf.to_parquet(output_path, engine='pyarrow', compression='snappy', partition_cols=['fecha_proceso'], index=False)
    
    #CSV
    for root, _, files in os.walk(output_path):
        for file in files:
            if file.endswith(".parquet"):
                parquet_file = os.path.join(root, file)
                csv_file = parquet_file.replace(".parquet", ".csv")
                pd.read_parquet(parquet_file).to_csv(csv_file, index=False)
                #print(f"Generado: {csv_file}")

In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ETL_CASO_INGENIERIA_DATOS") \
    .master("local[*]") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

try:
    # obtener datos
    df_raw = get_data(spark, config)
    
    if df_raw.count() == 0:
        print("No se encontraron registros que cumplan con los filtros")
    else:
        print(f"Registros encontrados: {df_raw.count()}")

    #transformacion de la data y normalizacion
        df_processed = transform_data(df_raw, config)
        #df_processed.show(5)
        
    #Carga de los datos
        load_data(df_processed, config)
        
    print("--- Fin del Proceso Exitoso ---")

except Exception as e:
    print(f"Error durante la ejecucion: {e}")

finally:
    spark.stop()

Registros encontrados: 12
--- Fin del Proceso Exitoso ---
