In [None]:
import sys
from pathlib import Path
sys.path.append(str(Path.cwd().parent.parent))

from src.utils.preprocessors import *
from src.utils.utils_fn import load_features_names
%load_ext autoreload
%autoreload 2

from sklearn.pipeline import Pipeline
import polars as pl
import joblib
import json
import warnings
warnings.simplefilter('ignore')

In [3]:
"""
Clasificación de artículos nuevos/usados para MercadoLibre
"""

def build_dataset():
    """Carga y prepara los datos de entrenamiento y prueba"""
    data_raw = [json.loads(line) for line in open('../../data/raw/MLA_100k_checked_v3.jsonlines')]
    return data_raw

if __name__ == '__main__':
    
    print('1. Cargando el dataset...')
    data_raw = build_dataset()
    
    # 1. Cargar el archivo YAML
    yaml_path = '../../config/features_names.yaml'
    column_mapping = load_features_names(yaml_path)
    
    # 2. Ejecutar Pipeline de preprocesadores
    print('2. Inicio de Preprocesamiento')
    pipe = Pipeline([
        ('raw_data', RawDataPreprocessor()),
        ('rename_columns', ColumnsRenameTransformer()),
        ('date_columns', DateColumnsTransformer(patterns=['time', 'date'])),
        ('duplicated_columns', DropDuplicateColumnsTransformer()),
        ('duplicated_rows', DropDuplicatedRowsTransformer()),
        ('nan_values', FillMissingValuesTransformer()),
        ('drop_columns', DropColumnsTransformer(0.2)),
        ('unnecessary_columns', DropUnnecessaryColumnsTransformer()),
        ('categorical_columns', CategoricalColumnsTransformer()),
        ('numeric_columns', NumericColumnTransformer()),
        ('aggreated_colummns', AggregatedColumnsTransformer(group_by_col='product_id', column_mapping=column_mapping))
    ])
    
    # Aplicar la pipeline a los datos y materializar el resultado
    data_processed: pl.DataFrame = pipe.fit_transform(data_raw).collect()
    print('3. ¡Preprocesamiento finalizado!')

1. Cargando el dataset...
2. Inicio de Preprocesamiento
3. ¡Preprocesamiento finalizado!


In [8]:
# Definir la ruta absoluta para la carpeta de pipelines
root_path = Path.cwd().resolve().parent.parent
# Crear el directorio si no existe
root_path.mkdir(parents=True, exist_ok=True)
artifact = 'pipeline_preprocessing.pkl'

# Exportamos el pipeline
joblib.dump(pipe, str(root_path / 'src/pipelines' / artifact))

['/home/lynn/Documentos/Development/Scripts_and_Notebooks/Proyectos Profesionales/Pruebas técnicas/MELI/src/pipelines/pipeline_preprocessing.pkl']

In [9]:
# Exportamos los datos
data_processed.write_parquet(str(root_path / 'data/processed/data_processed.parquet'))