In [17]:
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelEncoder, OrdinalEncoder
from sklearn.impute import SimpleImputer
import os
import mlflow
from dotenv import load_dotenv
from joblib import Memory
import joblib

In [2]:
load_dotenv()

True

In [3]:
# Загружаем данные

In [4]:
data = pd.read_csv("data/train_ver2.csv")

  data = pd.read_csv("data/train_ver2.csv")


In [5]:
# Для каждого шага предобработки данных напишем свой трансформер

In [6]:
# Удаляет ненужные столбцы
class DropColumns(BaseEstimator, TransformerMixin):
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X.drop(columns=self.columns, errors='ignore')

# Подготоваливает числовые столбцы
class ProcessNumericColumns(BaseEstimator, TransformerMixin):
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        for col in self.columns:
            X[col] = pd.to_numeric(X[col], errors='coerce')
            median = X[col].median()
            X[col] = X[col].fillna(median)
        return X

# Подготоваливает категориальные столбцы
class ProcessCategoricalColumns(BaseEstimator, TransformerMixin):
    mappings = {
        'indrel_1mes': {
            '1.0': '1',
            '2.0': '2',
            '3.0': '3',
            '4.0': '4'
        }
    }
    
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        for col in self.columns:
            X[col] = X[col].replace([pd.NA, None], 'unknown').astype(str)
            X[col] = X[col].str.strip()
            X[col] = X[col].replace(['NA', '', 'nan'], 'unknown')
            if col in self.mappings:
                X[col] = X[col].map(self.mappings[col])
        return X

# Подготоваливает столбцы с датами
class ProcessDateColumns(BaseEstimator, TransformerMixin):
    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        for col in self.columns:
            X[col] = pd.to_datetime(X[col], errors='coerce')
        return X        

# Подготавливает столбцы с булевыми значениями
class ProcessBooleanColumns(BaseEstimator, TransformerMixin):
    mappings = {
        'indresi': {'S': True, 'N': False},
        'indext': {'S': True, 'N': False},
        'indfall': {'S': True, 'N': False},
        'sexo': {'H': True, 'V': False}
    }

    def __init__(self, columns):
        self.columns = columns

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        for col in self.columns:
            if col in self.mappings:
                X[col] = X[col].map(self.mappings[col])
            X[col] = X[col].fillna(False)
            X[col] = X[col].astype(bool)
        return X

# Удаляет выбросы
class RemoveOutliers(BaseEstimator, TransformerMixin):
    def __init__(self, columns, threshold=1.5):
        self.columns = columns
        self.threshold = threshold

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        bounds = {
            col: (X[col].quantile(0.25) - self.threshold * (X[col].quantile(0.75) - X[col].quantile(0.25)),
                  X[col].quantile(0.75) + self.threshold * (X[col].quantile(0.75) - X[col].quantile(0.25)))
            for col in self.columns
        }
        for col, (lower, upper) in bounds.items():
            X = X[(X[col] >= lower) & (X[col] <= upper)]
        return X

# Преобразовывает категориальные столбцы
class LabelEncodeColumns(BaseEstimator, TransformerMixin):
    def __init__(self, columns):
        self.columns = columns
        self.encoders = {}

    def fit(self, X, y=None):
        for col in self.columns:
            le = LabelEncoder()
            le.fit(X[col].astype(str))
            self.encoders[col] = le
        return self

    def transform(self, X):
        X = X.copy()
        for col in self.columns:
            X[col] = self.encoders[col].transform(X[col].astype(str))
        return X        

In [7]:
data['segmento'].value_counts()

segmento
02 - PARTICULARES     7960220
03 - UNIVERSITARIO    4935579
01 - TOP               562142
Name: count, dtype: int64

In [12]:
# Определяем стобцы для трансформации
numeric_columns = ['age', 'antiguedad', 'renta']
datetime_columns = ['fecha_dato', 'fecha_alta']
columns_to_drop = ['tipodom', 'nomprov', 'ult_fec_cli_1t', 'conyuemp']

categorical_columns = [
    'ind_empleado', 'pais_residencia', 'indrel_1mes', 
    'tiprel_1mes', 'canal_entrada', 'segmento'
]

one_hot_encoding_columns = ['segmento']

boolean_columns = [
    col for col in data.columns if data[col].nunique() == 2 and not col in ['conyuemp']
]

In [13]:
# Приводим типы, заполняем пропущеные значения, удаляем выбросы
preprocess_data_pipeline = Pipeline([
    ('drop_columns', DropColumns(columns=columns_to_drop)),
    ('process_numeric', ProcessNumericColumns(columns=numeric_columns)),
    ('remove_outliers', RemoveOutliers(columns=numeric_columns)),
    ('process_categorical', ProcessCategoricalColumns(columns=categorical_columns)),
    ('process_boolean', ProcessBooleanColumns(columns=boolean_columns)),
    ('process_dates', ProcessDateColumns(columns=datetime_columns)),
])

one_hot_pipeline = Pipeline([
    ('imputer', SimpleImputer(strategy='constant', fill_value='unknown')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))
])

# Выполняем преобразования стобцов
column_transformer = ColumnTransformer(
    transformers=[
        ('one_hot', one_hot_pipeline, one_hot_encoding_columns),
        ('numeric', StandardScaler(), numeric_columns)
    ],
    remainder='passthrough'
)

# Собираем в пайплайн
pipeline = Pipeline([
    ('data_preprocessing', preprocess_data_pipeline),
    ('feature_transformation', column_transformer)
], memory="tmp/cache")

In [14]:
processed_data = pipeline.fit_transform(data)

In [20]:
# Сохраним данные локально и в mlflow
local_artifact_dir = "artifacts"
os.makedirs(local_artifact_dir, exist_ok=True)

pipeline_path = os.path.join(local_artifact_dir, "pipeline.pkl")
joblib.dump(pipeline, pipeline_path)

data_path = os.path.join(local_artifact_dir, "processed_data.csv")
pd.DataFrame(processed_data).to_csv(data_path, index=False)

In [22]:
TRACKING_SERVER_HOST = "127.0.0.1"
TRACKING_SERVER_PORT = 5000

EXPERIMENT_NAME = "final_pr_eda_experiment"
RUN_NAME = "eda"

assets_dir = "assets"
os.makedirs(assets_dir, exist_ok=True)

os.environ["MLFLOW_S3_ENDPOINT_URL"] = "https://storage.yandexcloud.net"
os.environ["AWS_ACCESS_KEY_ID"] = os.getenv("AWS_ACCESS_KEY_ID")
os.environ["AWS_SECRET_ACCESS_KEY"] = os.getenv("AWS_SECRET_ACCESS_KEY")

mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")
mlflow.set_registry_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")

In [24]:
if mlflow.get_experiment_by_name(EXPERIMENT_NAME):
    experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
else:
    experiment_id = mlflow.create_experiment(name=EXPERIMENT_NAME)

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    run_id = run.info.run_id

    mlflow.log_param("imputer_strategy", "constant")
    mlflow.log_artifact(pipeline_path, artifact_path="models")
    mlflow.log_artifact(data_path, artifact_path="data")

🏃 View run eda at: http://127.0.0.1:5000/#/experiments/1/runs/ee993741b40b4a539d487685462fc6bf
🧪 View experiment at: http://127.0.0.1:5000/#/experiments/1


In [None]:
# Построим модель ALS в качестве меры будет частота использования