In [1]:
import os
import numpy as np
import pandas as pd
from dotenv import load_dotenv
import psycopg2 as psycopg
load_dotenv()

True

In [2]:
connection = {"sslmode": "require", "target_session_attrs": "read-write"}
postgres_credentials = {
    "host": os.environ.get("DB_DESTINATION_HOST"), 
    "port": os.environ.get("DB_DESTINATION_PORT"),
    "dbname": os.environ.get("DB_DESTINATION_NAME"),
    "user": os.environ.get("DB_DESTINATION_USER"),
    "password": os.environ.get("DB_DESTINATION_PASSWORD"),
}
assert all([var_value != "" for var_value in list(postgres_credentials.values())])

connection.update(postgres_credentials)

# определим название таблицы, в которой хранятся наши данные.
TABLE_NAME = "clean_users_churn"

# эта конструкция создаёт контекстное управление для соединения с базой данных 
# оператор with гарантирует, что соединение будет корректно закрыто после выполнения всех операций 
# закрыто оно будет даже в случае ошибки, чтобы не допустить "утечку памяти"
with psycopg.connect(**connection) as conn:

    # создаёт объект курсора для выполнения запросов к базе данных
    # с помощью метода execute() выполняется SQL-запрос для выборки данных из таблицы TABLE_NAME
    with conn.cursor() as cur:
        cur.execute(f"SELECT * FROM {TABLE_NAME}")
                
        # извлекаем все строки, полученные в результате выполнения запроса
        data = cur.fetchall()

        # получает список имён столбцов из объекта курсора
        columns = [col[0] for col in cur.description]

# создаёт объект DataFrame из полученных данных и имён столбцов. 
# это позволяет удобно работать с данными в Python, используя библиотеку Pandas.
df = pd.DataFrame(data, columns=columns)

Готовим данные

In [3]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from category_encoders import CatBoostEncoder
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from catboost import CatBoostClassifier

In [4]:
features = df.drop(['target','id','customer_id','end_date','begin_date'],axis=1)

cat_features = features.select_dtypes(include='object')
potential_binary_features = cat_features.nunique() == 2
binary_cat_features = cat_features[potential_binary_features[potential_binary_features].index]
other_cat_features = cat_features[potential_binary_features[~potential_binary_features].index]
num_features = features.select_dtypes(['float'])

binary_cols = binary_cat_features.columns.tolist()
non_binary_cat_cols = other_cat_features.columns.tolist()
num_cols = num_features.columns.tolist()

In [5]:
from sklearn.model_selection import train_test_split
X_tr, X_val, y_tr, y_val = train_test_split(
    features,
    df['target'],
    stratify=df['target'])

In [8]:
preprocessor = ColumnTransformer([
            ('binary', OneHotEncoder(drop='if_binary', sparse_output=False), binary_cols),
            ('cat', OneHotEncoder(drop='if_binary'), non_binary_cat_cols),
            ('num', StandardScaler(), num_cols)
        ],
        remainder='drop',
        verbose_feature_names_out=False
    )
model = LogisticRegression(penalty='l2',C=0.2)
pipline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', model)
        ]
    )
pipline.fit(X_tr, y_tr)

In [9]:
prediction = pipline.predict(X_val)
probas = pipline.predict_proba(X_val)

Создаём метрики

In [10]:
print(probas[:,1])

[0.18188652 0.11709711 0.45382686 ... 0.01241106 0.10547907 0.45459648]


In [11]:
from sklearn.metrics import f1_score,roc_auc_score,recall_score,confusion_matrix,log_loss,precision_score

# заведите словарь со всеми метриками
metrics = {}

# посчитайте метрики из модуля sklearn.metrics
# err_1 — ошибка первого рода
# err_2 — ошибка второго рода
_, err1, _, err2 = confusion_matrix(y_val,prediction,normalize='all').ravel()
auc = roc_auc_score(y_val,probas[:,1])
precision = precision_score(y_val,prediction)
recall = recall_score(y_val,prediction)
f1 = f1_score(y_val,prediction)
logloss = log_loss(y_val,prediction)

# запишите значения метрик в словарь
metrics["err1"] = err1
metrics["err2"] = err2
metrics["auc"] = auc
metrics["precision"] = precision
metrics["recall"] = recall
metrics["f1"] = f1
metrics["logloss"] = logloss

print(metrics)

{'err1': 0.08063600227143668, 'err2': 0.13287904599659284, 'auc': 0.8337310399835841, 'precision': 0.6223404255319149, 'recall': 0.5010706638115632, 'f1': 0.5551601423487544, 'logloss': 7.675394674002801}


Делаем словарь по исходным данным

In [12]:
counts_columns = [
    "type", "paperless_billing", "internet_service", "online_security", "online_backup", "device_protection",
    "tech_support", "streaming_tv", "streaming_movies", "gender", "senior_citizen", "partner", "dependents",
    "multiple_lines", "target"
]

stats = {}

for col in counts_columns:
		# посчитайте уникальные значения для колонок, где немного уникальных значений (переменная counts_columns)
    column_stat = df[col].value_counts()
    column_stat = {f"{col}_{key}": value for key, value in column_stat.items()}

		# обновите словарь stats
    stats.update(column_stat)


stats["data_length"] = df.shape[0]
stats["monthly_charges_min"] = df["monthly_charges"].min()
stats["monthly_charges_max"] = df["monthly_charges"].max()
stats["monthly_charges_mean"] = df["monthly_charges"].mean() # посчитайте среднее значение в колонке
stats["monthly_charges_median"] = df["monthly_charges"].median() # посчитайте медианное значение в колонке
stats["total_charges_min"] = df["total_charges"].min() # посчитайте минимальное значение в колонке
stats["total_charges_max"] = df["total_charges"].max() # посчитайте максимальное значение в колонке
stats["total_charges_mean"] = df["total_charges"].mean() # посчитайте среднее значение в колонке
stats["total_charges_median"] = df["total_charges"].median() # посчитайте медианное значение в колонке
stats["unique_customers_number"] = df["customer_id"].nunique() # посчитайте кол-во уникальных id
stats["end_date_nan"] = df["end_date"].isnull().sum() # посчитайте кол-во пустых строк в колонке

In [13]:
print(stats)

{'type_Month-to-month': 3875, 'type_Two year': 1695, 'type_One year': 1473, 'paperless_billing_Yes': 4171, 'paperless_billing_No': 2872, 'internet_service_Fiber optic': 4622, 'internet_service_DSL': 2421, 'online_security_No': 5024, 'online_security_Yes': 2019, 'online_backup_No': 4614, 'online_backup_Yes': 2429, 'device_protection_No': 4621, 'device_protection_Yes': 2422, 'tech_support_No': 4999, 'tech_support_Yes': 2044, 'streaming_tv_No': 4336, 'streaming_tv_Yes': 2707, 'streaming_movies_No': 4311, 'streaming_movies_Yes': 2732, 'gender_Male': 3555, 'gender_Female': 3488, 'senior_citizen_0': 5901, 'senior_citizen_1': 1142, 'partner_No': 3641, 'partner_Yes': 3402, 'dependents_No': 4933, 'dependents_Yes': 2110, 'multiple_lines_No': 4072, 'multiple_lines_Yes': 2971, 'target_0': 5174, 'target_1': 1869, 'data_length': 7043, 'monthly_charges_min': 18.25, 'monthly_charges_max': 118.75, 'monthly_charges_mean': 64.76169246059918, 'monthly_charges_median': 70.35, 'total_charges_min': 18.8, 'to

Записываем файлы артефактов

In [14]:
with open("columns.txt", "w", encoding="utf-8") as fio:
    fio.write(','.join(columns[1:]))

df.to_csv("users_churn.csv", index=False)

Готовим подключение к MLFlow

In [15]:
os.environ['MLFLOW_S3_ENDPOINT_URL']='https://storage.yandexcloud.net'
os.environ['AWS_BUCKET_NAME']=os.environ.get("S3_BUCKET_NAME")

In [16]:
import mlflow
TRACKING_SERVER_HOST = "127.0.0.1"
TRACKING_SERVER_PORT = 5000
mlflow.set_tracking_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")
mlflow.set_registry_uri(f"http://{TRACKING_SERVER_HOST}:{TRACKING_SERVER_PORT}")

In [17]:
EXPERIMENT_NAME = "churn_ujhmaster"
experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
#experiment_id = mlflow.create_experiment(EXPERIMENT_NAME)
print(experiment_id)

30


Логируем артефакты в MLFlow

In [18]:
class CustomMlflowModel(mlflow.pyfunc.PythonModel):

    def __init__(self, model):
        super().__init__()
        self._model = model 
    
    def predict(self, context, model_input):
        probas = self._model.predict(model_input)
        
        return probas

custom_model = CustomMlflowModel(pipline)

In [19]:
RUN_NAME = "data_check"
DIR_PATH = 'dataframe'
REGISTRY_MODEL_NAME = "churn_model_ujhmaster"

pip_requirements = "./requirements.txt"
metadata = {'model_type': 'monthly'}
input_example = X_val[:10]
signature = mlflow.models.infer_signature(X_val, prediction)

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    # получаем уникальный идентификатор запуска эксперимента
    run_id = run.info.run_id
    
    # логируем метрики эксперимента
    mlflow.log_metrics(stats)
    mlflow.log_metrics(metrics)
    
    # логируем файлы как артефакты эксперимента — 'columns.txt' и 'users_churn.csv'
    mlflow.log_artifact("columns.txt",artifact_path=DIR_PATH)
    mlflow.log_artifact("users_churn.csv",artifact_path=DIR_PATH)
    # логируем модель
    model_info = mlflow.pyfunc.log_model( 
            python_model=custom_model, 
			pip_requirements=pip_requirements,
            signature=signature, 
            metadata=metadata,
            input_example=input_example, 
            artifact_path='models', 
            registered_model_name=REGISTRY_MODEL_NAME,
            await_registration_for=60
		)


experiment = mlflow.get_experiment_by_name(EXPERIMENT_NAME)
# получаем данные о запуске эксперимента по его уникальному идентификатору
run = mlflow.get_run(run_id)


# проверяем, что статус запуска эксперимента изменён на 'FINISHED'
# это утверждение (assert) можно использовать для автоматической проверки того, 
# что эксперимент был завершён успешно
assert 'FINISHED' == run.info.status

# удаляем файлы 'columns.txt' и 'users_churn.csv' из файловой системы,
# чтобы очистить рабочую среду после логирования артефактов
os.remove("columns.txt")
os.remove("users_churn.csv")

  inputs = _infer_schema(model_input) if model_input is not None else None
Registered model 'churn_model_ujhmaster' already exists. Creating a new version of this model...
2024/09/03 20:00:17 INFO mlflow.tracking._model_registry.client: Waiting up to 60 seconds for model version to finish creation. Model name: churn_model_ujhmaster, version 10
Created version '10' of model 'churn_model_ujhmaster'.


In [20]:
loaded_model = mlflow.pyfunc.load_model(model_uri=model_info.model_uri)
model_predictions = loaded_model.predict(X_val)

assert model_predictions.dtype == int

print(model_predictions[:10])

Downloading artifacts:   0%|          | 0/6 [00:00<?, ?it/s]

[0 0 0 0 0 1 0 0 1 0]


In [21]:
print(y_val[:10])

6131    0
2894    0
321     0
5078    0
4654    0
4342    1
5486    0
1042    0
4558    0
1604    0
Name: target, dtype: int64


In [None]:
import boto3
def get_session_student():
    session = boto3.session.Session()

    return session.client(
        service_name='s3',
        endpoint_url='https://storage.yandexcloud.net',
        aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
        aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY")
    )

s3 = get_session_student()
bucket_name = os.environ.get('S3_BUCKET_NAME')
if s3.list_objects(Bucket=bucket_name)['Contents']:
    for key in s3.list_objects(Bucket=bucket_name)['Contents']:
        print(key['Key'])

In [23]:
client = mlflow.MlflowClient() 

REGISTRY_MODEL_NAME = 'churn_model_ujhmaster'
models = client.search_model_versions(
    filter_string=f"name = '{REGISTRY_MODEL_NAME}'"
)
print(f"Model info:\n {models}")

model_name_1 = models[-1].name
model_version_1 = models[-1].version
model_stage_1 = models[-1].current_stage

model_name_2 = models[-2].name
model_version_2 = models[-2].version
model_stage_2 = models[-2].current_stage


print(f"Текущий stage модели 1: {model_stage_1}")
print(f"Текущий stage модели 2: {model_stage_2}")

# поменяйте статус каждой модели
client.transition_model_version_stage(model_name_1, model_version_1, "production")
client.transition_model_version_stage(model_name_2, model_version_2, "staging")

# переимнуйте модель в реестре
client.rename_registered_model(
    name=REGISTRY_MODEL_NAME, 
    new_name=f"{REGISTRY_MODEL_NAME}_b2c"
) 

Model info:
 [<ModelVersion: aliases=[], creation_timestamp=1725393617047, current_stage='None', description='', last_updated_timestamp=1725393617047, name='churn_model_ujhmaster', run_id='4068bae93b394f60918670916bdde181', run_link='', source='s3://s3-student-mle-20240729-393dbfd5ab/30/4068bae93b394f60918670916bdde181/artifacts/models', status='READY', status_message='', tags={'data_verson': 'orign', 'estimator': 'logReg'}, user_id='', version='10'>, <ModelVersion: aliases=[], creation_timestamp=1725374649041, current_stage='Staging', description='', last_updated_timestamp=1725378019543, name='churn_model_ujhmaster', run_id='8cdf2268043046e78707f67fe6fc5662', run_link='', source='s3://s3-student-mle-20240729-393dbfd5ab/30/8cdf2268043046e78707f67fe6fc5662/artifacts/models', status='READY', status_message='', tags={'data_version': 'orign', 'estimator': 'logCatBoost'}, user_id='', version='9'>, <ModelVersion: aliases=[], creation_timestamp=1725374624842, current_stage='Production', descr