In [99]:
import os
import datetime
from dateutil.relativedelta import relativedelta

import psycopg
import pandas as pd
import mlflow
from catboost import CatBoostClassifier
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (
    OneHotEncoder, 
    SplineTransformer, 
    QuantileTransformer, 
    RobustScaler,
    PolynomialFeatures,
    KBinsDiscretizer,
)
from sklearn.metrics import (
    roc_auc_score,
    f1_score,
    precision_score,
    recall_score,
    confusion_matrix,
    log_loss,
)

##### 1. Определим глобальные перменные

In [104]:
TABLE_NAME = "users_churn"

TRACKING_SERVER_HOST = "127.0.0.1"
TRACKING_SERVER_PORT = 5000

EXPERIMENT_NAME = "churn_nikolaistepanov"
RUN_NAME = "preprocessing"
REGISTRY_MODEL_NAME = "churn_model_nikolaistepanov_prepared"

##### 2. Заберем данные из базы данных и сформируем `dataframe`

In [58]:
connection = {"sslmode": "verify-full", "target_session_attrs": "read-write"}
postgres_credentials = {
    "host": os.getenv("POSTGRES_HOST"),
    "port": os.getenv("POSTGRES_PORT"),
    "dbname": os.getenv("POSTGRES_DBNAME"),
    "user": os.getenv("POSTGRES_USER"),
    "password": os.getenv("POSTGRES_PASSWORD"),
}

connection.update(postgres_credentials)

In [59]:
with psycopg.connect(**connection) as conn:

    with conn.cursor() as cur:
        cur.execute(f"SELECT * FROM {TABLE_NAME}")
        data = cur.fetchall()
        columns = [col[0] for col in cur.description]

df = pd.DataFrame(data, columns=columns)

##### 3. Посмотрим формально на типы наших данных

In [60]:
df.dtypes

id                            int64
customer_id                  object
begin_date           datetime64[ns]
end_date             datetime64[ns]
type                         object
paperless_billing             int64
payment_method               object
monthly_charges             float64
total_charges               float64
internet_service             object
online_security               int64
online_backup                 int64
device_protection             int64
tech_support                  int64
streaming_tv                  int64
streaming_movies              int64
gender                       object
senior_citizen                int64
partner                       int64
dependents                    int64
multiple_lines                int64
target                        int64
dtype: object

Последовательно будем работать с каждым типом данных и преобразовывать его в нужный нам формат при помощи `encoders`

##### 3.1 Тип данных `object`

In [61]:
obj_df = df.select_dtypes(include="object")

In [62]:
obj_df.head(2)

Unnamed: 0,customer_id,type,payment_method,internet_service,gender
0,7590-VHVEG,Month-to-month,Electronic check,DSL,Female
1,5575-GNVDE,One year,Mailed check,DSL,Male


In [63]:
cat_columns = ["type", "payment_method", "internet_service", "gender"]

In [64]:
encoder_oh = OneHotEncoder(
    categories="auto",
    handle_unknown='ignore',
    max_categories=10,
    sparse_output=False,
    drop="first"
)
encoded_features = encoder_oh.fit_transform(df[cat_columns].to_numpy())

In [65]:
encoded_df = pd.DataFrame(
    encoded_features, 
    columns=encoder_oh.get_feature_names_out(cat_columns)
)
obj_df = pd.concat([obj_df, encoded_df], axis=1)

In [66]:
obj_df.head(2)

Unnamed: 0,customer_id,type,payment_method,internet_service,gender,type_One year,type_Two year,payment_method_Credit card (automatic),payment_method_Electronic check,payment_method_Mailed check,internet_service_Fiber optic,internet_service_No data,gender_Male
0,7590-VHVEG,Month-to-month,Electronic check,DSL,Female,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
1,5575-GNVDE,One year,Mailed check,DSL,Male,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0


##### 3.2 Тип данных `float`, `int64`

In [67]:
num_df = df.select_dtypes(include=["float64", "int64"])

In [68]:
num_df.head(2)

Unnamed: 0,id,paperless_billing,monthly_charges,total_charges,online_security,online_backup,device_protection,tech_support,streaming_tv,streaming_movies,senior_citizen,partner,dependents,multiple_lines,target
0,1,1,29.85,29.85,0,1,0,0,0,0,0,1,0,0,0
1,2,0,56.95,1889.5,1,0,1,0,0,0,0,0,0,0,0


In [69]:
num_columns = ["monthly_charges", "total_charges"]

In [70]:
encoder_spl = SplineTransformer(n_knots=3, degree=4)
encoded_features = encoder_spl.fit_transform(df[num_columns].to_numpy())

encoded_df = pd.DataFrame(
    encoded_features, 
    columns=encoder_spl.get_feature_names_out(num_columns)
)
num_df = pd.concat([num_df, encoded_df], axis=1)

In [71]:
n_quantiles=100

encoder_q = QuantileTransformer(n_quantiles=n_quantiles)
encoded_features = encoder_q.fit_transform(df[num_columns].to_numpy())

encoded_df = pd.DataFrame(
    encoded_features, 
    columns=encoder_q.get_feature_names_out(num_columns)
)
encoded_df.columns = [col + f"_q_{n_quantiles}" for col in num_columns]
num_df = pd.concat([num_df, encoded_df], axis=1)

In [72]:
encoder_rb = RobustScaler()
encoded_features = encoder_rb.fit_transform(df[num_columns].to_numpy())

encoded_df = pd.DataFrame(
    encoded_features, 
    columns=encoder_rb.get_feature_names_out(num_columns)
)
encoded_df.columns = [col + f"_robust" for col in num_columns]
num_df = pd.concat([num_df, encoded_df], axis=1)

In [73]:
degree = 3

encoder_pol = PolynomialFeatures(degree=degree)
encoded_features = encoder_pol.fit_transform(df[num_columns].to_numpy())

encoded_df = pd.DataFrame(
    encoded_features, 
    columns=encoder_pol.get_feature_names_out(num_columns)
)
encoded_df[encoded_df.columns[1 + len(num_columns):]]
num_df = pd.concat([num_df, encoded_df], axis=1)

In [74]:
n_bins = 5

encoder_kbd = KBinsDiscretizer(
    n_bins=n_bins, 
    encode='ordinal', 
    strategy='uniform', 
    subsample=None
)
encoded_features = encoder_kbd.fit_transform(df[num_columns].to_numpy())

encoded_df = pd.DataFrame(
    encoded_features, 
    columns=encoder_kbd.get_feature_names_out(num_columns)
)
encoded_df.columns = [col + f"_bin" for col in num_columns]
num_df = pd.concat([num_df, encoded_df], axis=1)

In [75]:
num_df.head(2)

Unnamed: 0,id,paperless_billing,monthly_charges,total_charges,online_security,online_backup,device_protection,tech_support,streaming_tv,streaming_movies,...,total_charges.1,monthly_charges^2,monthly_charges total_charges,total_charges^2,monthly_charges^3,monthly_charges^2 total_charges,monthly_charges total_charges^2,total_charges^3,monthly_charges_bin,total_charges_bin
0,1,1,29.85,29.85,0,1,0,0,0,0,...,29.85,891.0225,891.0225,891.0225,26597.021625,26597.02,26597.02,26597.02,0.0,0.0
1,2,0,56.95,1889.5,1,0,1,0,0,0,...,1889.5,3243.3025,107607.025,3570210.0,184706.077375,6128220.0,203323500.0,6745912000.0,1.0,1.0


##### 3.3 Тип данных `datetime`

In [76]:
dt_df = df.select_dtypes(include="datetime")

In [77]:
dt_df["current_date"] = datetime.datetime.now()

In [78]:
dt_df["days_diff"] = dt_df.apply(lambda row: (row["current_date"] - row["begin_date"]).days, axis=1)
dt_df["diff_years"] = dt_df.apply(lambda row: relativedelta(row["current_date"], row["begin_date"]).years, axis=1)
dt_df["diff_months"] = dt_df.apply(lambda row: relativedelta(row["current_date"], row["begin_date"]).months, axis=1)

##### 4. Объединим все `encoders` в `Pipeline` и `ColumnTransformer`

In [79]:
numeric_transformer = ColumnTransformer(
    transformers=[
        ("spl", encoder_spl, num_columns),
        ("q", encoder_q, num_columns),
        ("rb", encoder_rb, num_columns),
        ("pol", encoder_pol, num_columns),
        ("kbd", encoder_kbd, num_columns),
    ]
)

categorical_transformer = Pipeline(
    steps=[
        ("encoder", encoder_oh),
    ]
)

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, num_columns),
        ("cat", categorical_transformer, cat_columns),
    ],
    n_jobs=-1
)

In [80]:
preprocessor

##### 4.1 Проверим, что все работает

In [81]:
encoded_features = preprocessor.fit_transform(df)

transformed_df = pd.DataFrame(
    encoded_features, 
    columns=preprocessor.get_feature_names_out()
)

In [82]:
transformed_df.head(2)

Unnamed: 0,num__spl__monthly_charges_sp_0,num__spl__monthly_charges_sp_1,num__spl__monthly_charges_sp_2,num__spl__monthly_charges_sp_3,num__spl__monthly_charges_sp_4,num__spl__monthly_charges_sp_5,num__spl__total_charges_sp_0,num__spl__total_charges_sp_1,num__spl__total_charges_sp_2,num__spl__total_charges_sp_3,...,num__kbd__monthly_charges,num__kbd__total_charges,cat__type_One year,cat__type_Two year,cat__payment_method_Credit card (automatic),cat__payment_method_Electronic check,cat__payment_method_Mailed check,cat__internet_service_Fiber optic,cat__internet_service_No data,cat__gender_Male
0,0.014583,0.335266,0.554993,0.09504,0.000118,0.0,0.040533,0.454885,0.461758,0.042824,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
1,0.000116,0.094742,0.554677,0.335807,0.014658,0.0,0.004242,0.228653,0.596333,0.169278,...,1.0,1.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0


##### 4.2 Сохраним `ColumnTransformer` в `MLFlow`

In [83]:
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "..."
os.environ["AWS_ACCESS_KEY_ID"] = "..."
os.environ["AWS_SECRET_ACCESS_KEY"] = "..."

In [84]:
mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")
mlflow.set_registry_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")

In [85]:
# experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id

# with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
#     run_id = run.info.run_id

#     mlflow.sklearn.log_model(preprocessor, "column_transformer")

##### 4.3 Проверим, что загруженный `ColumnTransformer` работает точно также, как и оригинальный

In [31]:
model_uri = f"runs:/{run_id}/column_transformer"

loaded_transformer = mlflow.sklearn.load_model(model_uri)

In [32]:
loaded_transformer

In [33]:
encoded_features = loaded_transformer.fit_transform(df)

transformed_df_loaded_from_mlflow = pd.DataFrame(
    encoded_features, 
    columns=preprocessor.get_feature_names_out()
)

In [34]:
assert transformed_df_loaded_from_mlflow.equals(transformed_df)

##### 5. Объединим признаки модель с изначальным набором данным и обучим модель с новыми признаками

In [86]:
df = pd.concat([df, transformed_df], axis=1)

In [87]:
df["current_date"] = datetime.datetime.now()
df["days_diff"] = df.apply(lambda row: (row["current_date"] - row["begin_date"]).days, axis=1)
df["diff_years"] = df.apply(lambda row: relativedelta(row["current_date"], row["begin_date"]).years, axis=1)
df["diff_months"] = df.apply(lambda row: relativedelta(row["current_date"], row["begin_date"]).months, axis=1)

In [95]:
features = (
    list(transformed_df.columns) + \
    [
        "days_diff", "diff_years", "diff_months"
    ] + \
    [
        'monthly_charges', 'total_charges', 'online_security', 'online_backup',
        'device_protection', 'tech_support', 'streaming_tv', 'streaming_movies',
        'senior_citizen', 'partner', 'dependents', 'multiple_lines'
    ]
)

target = "target"

loss_function = "Logloss"
task_type = 'CPU'
random_seed = 0
iterations = 400
verbose = False
max_depth=3

model = CatBoostClassifier(
    iterations=iterations, 
    loss_function=loss_function, 
    random_seed=random_seed, 
    task_type=task_type,
    verbose=verbose,
    max_depth=max_depth
)

test_size = 0.2
X_train, X_test, y_train, y_test = train_test_split(
    df[features],
    df[target],
    test_size=test_size,
    shuffle=False,
)

In [96]:
df.head(2)

Unnamed: 0,id,customer_id,begin_date,end_date,type,paperless_billing,payment_method,monthly_charges,total_charges,internet_service,...,cat__payment_method_Credit card (automatic),cat__payment_method_Electronic check,cat__payment_method_Mailed check,cat__internet_service_Fiber optic,cat__internet_service_No data,cat__gender_Male,current_date,days_diff,diff_years,diff_months
0,1,7590-VHVEG,2020-01-01,NaT,Month-to-month,1,Electronic check,29.85,29.85,DSL,...,0.0,1.0,0.0,0.0,0.0,0.0,2023-10-31 13:55:36.785985,1399,3,9
1,2,5575-GNVDE,2017-04-01,NaT,One year,0,Mailed check,56.95,1889.5,DSL,...,0.0,0.0,1.0,0.0,0.0,1.0,2023-10-31 13:55:36.785985,2404,6,6


In [97]:
print(f"Размер выборки для обучения: {X_train.shape}")
print(f"Размер выборки для теста: {X_test.shape}")

Размер выборки для обучения: (5634, 51)
Размер выборки для теста: (1409, 51)


In [98]:
model.fit(X_train, y_train)

<catboost.core.CatBoostClassifier at 0x2905cb510>

In [101]:
prediction = model.predict(X_test)
probas = model.predict_proba(X_test)[:, 1]

In [103]:
metrics = {}

_, err1, _, err2 = confusion_matrix(y_test, prediction, normalize='all').ravel()
auc = roc_auc_score(y_test, probas)
precision = precision_score(y_test, prediction)
recall = recall_score(y_test, prediction)
f1 = f1_score(y_test, prediction)
logloss = log_loss(y_test, prediction)

metrics["err1"] = err1
metrics["err2"] = err2
metrics["auc"] = auc
metrics["precision"] = precision
metrics["recall"] = recall
metrics["f1"] = f1
metrics["logloss"] = logloss

In [105]:
pip_requirements="../requirements.txt"
signature = mlflow.models.infer_signature(X_test, prediction)
input_example = X_test[:10]

experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    
    mlflow.log_metrics(metrics)
    cv_info = mlflow.sklearn.log_model(preprocessor, artifact_path="preprocessor")
    model_info = mlflow.catboost.log_model(
        cb_model=model, 
        artifact_path="models",
        signature=signature,
        input_example=input_example,
        registered_model_name=REGISTRY_MODEL_NAME,
        await_registration_for=60,
        pip_requirements=pip_requirements,
    )

  inputs = _infer_schema(model_input) if model_input is not None else None
Registered model 'churn_model_nikolaistepanov_prepared' already exists. Creating a new version of this model...
2023/10/31 13:59:04 INFO mlflow.tracking._model_registry.client: Waiting up to 60 seconds for model version to finish creation. Model name: churn_model_nikolaistepanov_prepared, version 9
Created version '9' of model 'churn_model_nikolaistepanov_prepared'.
