#### Часть кода пайплайна для генерации модели с предыдущего проекта

In [1]:
# Import necessary standard libraries
import os
import time
import warnings

# Import third-party libraries for AWS interactions, model handling, HTTP requests, random number generation, and data manipulation
import boto3
import joblib
import requests
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import geopy
from geopy.distance import geodesic

# Load environment variables from a .env file and pretty print module for more readable outputs
from dotenv import load_dotenv
from pprint import pprint

# SQLAlchemy for database interactions
from sqlalchemy import create_engine

# Machine learning and feature engineering libraries
from autofeat import AutoFeatRegressor
from catboost import CatBoostRegressor
from mlxtend.feature_selection import SequentialFeatureSelector

# Import necessary components from scikit-learn for building and evaluating machine learning models
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.compose import ColumnTransformer
from sklearn.linear_model import Ridge
from sklearn.metrics import mean_absolute_error, r2_score, make_scorer, root_mean_squared_error
from sklearn.model_selection import train_test_split, cross_validate
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import StandardScaler, KBinsDiscretizer, PowerTransformer, PolynomialFeatures, scale

# Configure pandas display options for better readability of floating point numbers
pd.options.display.float_format = '{:,.2f}'.format

# Set warning level for matplotlib to suppress warnings
plt.set_loglevel('WARNING')

# Define constants for database interactions and geographical calculations
TABLE_NAME = 'clean_flats'
MOSCOW_CENTER = (55.751610795409086, 37.61799504180682)

# Set random state for reproducibility
RANDOM_STATE = 42
random.seed(RANDOM_STATE) 
np.random.seed(RANDOM_STATE)

# Load environment variables from a .env file
load_dotenv()


True

In [2]:
# Function to create a database engine
def get_engine():
    """
    Create and return a SQLAlchemy engine using environment variables for database connection.
    """
    return create_engine(f"postgresql://{os.getenv('DB_DESTINATION_USER')}:"
                         f"{os.getenv('DB_DESTINATION_PASSWORD')}@"
                         f"{os.getenv('DB_DESTINATION_HOST')}:"
                         f"{os.getenv('DB_DESTINATION_PORT')}/"
                         f"{os.getenv('DB_DESTINATION_NAME')}")

# Function to load a dataframe from a database table
def load_df(table_name):
    """
    Load and return a dataframe from the specified table in the database.
    """
    return pd.read_sql(sql=f'SELECT * FROM {table_name}', con=get_engine())

# Function to display statistics of a dataframe
def display_statistics(data):
    """
    Display various statistics for each column in the dataframe.
    """
    def lo_hi_count(data, col, low=True):
        """
        Calculate and return the count of outliers in a column based on the IQR method.
        """
        if data[col].dtype not in [float, int, 'datetime64[ns]']:
            return '---'
        Q1 = np.nanquantile(data[col], 0.25)
        Q3 = np.nanquantile(data[col], 0.75)
        if low:
            return data[data[col] <= (Q1 - 1.5 * (Q3 - Q1))][col].count()
        else:
            return data[data[col] >= (Q3 + 1.5 * (Q3 - Q1))][col].count()
    
    # Create and return a dataframe with statistics for each column
    return pd.DataFrame(
        {
            'type': [data[x].dtypes for x in data.columns],
            'count': [data[x].count() for x in data.columns],
            'NaNs': [data[x].isna().sum() for x in data.columns],
            'zero_values': [data[x].eq(0).sum() for x in data.columns],
            'unique_values': [data[x].nunique() for x in data.columns],
            'top_3_freq': [data[x].value_counts().head(3).to_dict() for x in data.columns],
            'min': [data[x].min() if data[x].dtype != object else '---' for x in data.columns],
            'mean': [data[x].mean() if data[x].dtype != object else '---' for x in data.columns],
            'max': [data[x].max() if data[x].dtype != object else '---' for x in data.columns],
            'std': [data[x].std() if data[x].dtype != object else '---' for x in data.columns],
            'lo_count': [lo_hi_count(data, x) for x in data.columns],
            'hi_count': [lo_hi_count(data, x, low=False) for x in data.columns],
        }, index=[x for x in data.columns]
    )

# Function to get metrics for a model
def get_metrics(model, x_train, y_train, x_val, y_val, need_fit=True):
    """
    Fit the model (if needed), make predictions, and return performance metrics.
    """
    start_time = time.time()
    if isinstance(y_train, pd.core.frame.DataFrame):
        y_train = y_train.values.ravel()
    if need_fit:
        model = clone(model)
        model.fit(x_train, y_train)
    elapsed_fit_time = time.time() - start_time

    start_time = time.time()
    y_pred = model.predict(x_val)
    elapsed_predict_time = time.time() - start_time

    metrics = {}
    metrics['mae'] = mean_absolute_error(y_val, y_pred)
    metrics['rmse'] = root_mean_squared_error(y_val, y_pred)
    metrics['r2'] = r2_score(y_val, y_pred)
    metrics['fit_time'] = elapsed_fit_time
    metrics['predict_time'] = elapsed_predict_time
    return metrics

# Function to split the dataframe into training, validation, and testing sets
def split_wrapper(df, target='price', train_size=0.7, test_size=0.5):
    """
    Split the dataframe into training, validation, and test sets.
    """
    X = df.drop(target, axis=1).copy()
    y = df[target].copy()
    X_train, X_val_test, y_train, y_val_test = train_test_split(X, y, train_size=train_size, random_state=RANDOM_STATE)
    X_val, X_test, y_val, y_test = train_test_split(X_val_test, y_val_test, test_size=test_size, random_state=RANDOM_STATE)

    return X_train, y_train, X_val, y_val, X_test, y_test

# Function to get cross-validated metrics for a model
def get_metricsCV(model, x_train, y_train, cv=5):
    """
    Perform cross-validation and return averaged performance metrics.
    """
    metrics = {}
    scoring = ['neg_mean_absolute_error', 'neg_root_mean_squared_error', 'r2']
    cv_results = cross_validate(model, x_train, y_train, cv=cv, scoring=scoring, return_train_score=False)
    metrics['fit_time'] = np.mean(cv_results['fit_time'])
    metrics['predict_time'] = np.mean(cv_results['score_time'])
    metrics['mae'] = -np.mean(cv_results['test_neg_mean_absolute_error'])
    metrics['rmse'] = -np.mean(cv_results['test_neg_root_mean_squared_error'])
    metrics['r2'] = np.mean(cv_results['test_r2'])

    return metrics


In [3]:
# Load the dataframe from the specified table and drop the 'id' and 'flat_id' columns
df = load_df(TABLE_NAME).drop(['id', 'flat_id'], axis=1)

# Print the number of duplicate rows in the dataframe
print(f'Дубликатов: {df.duplicated().sum()}')

# Display various statistics for each column in the dataframe, sorted by the number of unique values
display_statistics(df).sort_values(by='unique_values')

Дубликатов: 0


Unnamed: 0,type,count,NaNs,zero_values,unique_values,top_3_freq,min,mean,max,std,lo_count,hi_count
is_apartment,int64,130755,0,129546,2,"{0: 129546, 1: 1209}",0.0,0.01,1.0,0.1,129546,130755
has_elevator,int64,130755,0,13353,2,"{1: 117402, 0: 13353}",0.0,0.9,1.0,0.3,130755,117402
rooms,int64,130755,0,0,5,"{2: 49303, 1: 38906, 3: 34205}",1.0,2.11,5.0,0.94,0,0
building_type_int,int64,130755,0,1726,6,"{4: 73635, 2: 22764, 1: 21269}",0.0,3.25,6.0,1.46,0,0
floor,int64,130755,0,0,44,"{2: 13606, 3: 12656, 5: 11812}",1.0,7.42,44.0,5.55,0,3779
floors_total,int64,130755,0,0,54,"{9: 23406, 17: 21647, 12: 16086}",3.0,14.04,56.0,6.68,0,3337
ceiling_height,float64,130755,0,0,61,"{2.640000104904175: 54855, 3.0: 20987, 2.70000...",2.48,2.75,4.1,0.2,0,8408
build_year,int64,130755,0,0,118,"{2017: 4071, 2018: 3973, 1968: 3272}",1901.0,1986.46,2023.0,21.99,623,0
flats_count,int64,130755,0,0,705,"{80: 3926, 144: 2632, 84: 2475}",1.0,252.23,1630.0,206.71,0,6073
kitchen_area,float64,130755,0,0,2627,"{6.0: 14581, 10.0: 12389, 9.0: 8907}",2.9,10.11,70.0,5.17,0,9584


In [4]:
# Define the target variable
target = ['price']

# Identify categorical columns: columns with 6 or fewer unique values
cat_columns = [x for x in df.columns if df[x].nunique() <= 6]

# Identify numerical columns: columns that are not categorical and not the target variable
num_columns = [x for x in df.columns if x not in cat_columns and x != target[0]]

# Further categorize numerical columns based on their characteristics
num_discrete_columns = ['floor', 'ceiling_height', 'flats_count', 'floors_total']
num_time_columns = ['building_id', 'build_year']
num_area_columns = ['kitchen_area', 'living_area', 'total_area']
num_geo_columns = ['latitude', 'longitude']

# Columns to be transformed using KBinsDiscretizer
kbins_columns = num_discrete_columns + num_time_columns + num_geo_columns

# Define columns for AutoFeat transformations
autofeat_cat_columns = ['bin5__' + x for x in kbins_columns] + ['remainder__' + x for x in cat_columns]
autofeat_feateng_columns = (
    ['power__' + x for x in num_area_columns] 
    + ['add__scale__distance'] 
    + ['scale__' + x for x in kbins_columns]
)
autofeat_columns = autofeat_cat_columns + autofeat_feateng_columns

# Define columns for polynomial feature transformations
polyfeat_columns = (
    ['power__' + x for x in num_area_columns] 
    + ['add__scale__distance'] 
    + ['scale__' + x for x in kbins_columns]
)

In [5]:
# Split the dataframe into training, validation, and test sets
# The function 'split_wrapper' splits the data and returns the features (X) and target (y) for each set
X_train, y_train, X_val, y_val, X_test, y_test = split_wrapper(df)

In [6]:
# Function to calculate the distance from a given point to the center of Moscow
def calculate_distance(row):
    return geopy.distance.geodesic(MOSCOW_CENTER, (row['latitude'], row['longitude'])).km

# Custom transformer to add a new feature 'scale__distance' representing the distance to Moscow center
class FeatureAdder(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        X_new = pd.DataFrame(X).copy()
        X_new['scale__distance'] = scale(X_new.apply(calculate_distance, axis=1))
        return pd.DataFrame(X_new['scale__distance'])
    
    def get_feature_names_out(self, input_features=None):
        return ['scale__distance']

# Wrapper for AutoFeatRegressor to integrate it into scikit-learn pipelines
class AutoFeatWrapper(BaseEstimator, TransformerMixin):
    def __init__(self, model):
        self.model = model
    
    def fit(self, X, y=None):
        self.model.fit(X, y)
        self.feature_names_out = self.model.all_columns_
        return self
    
    def transform(self, X):
        index = X.index
        transformed_X = self.model.transform(X)
        transformed_df = pd.DataFrame(transformed_X, columns=self.feature_names_out)
        transformed_df.index = index
        return transformed_df

    def get_feature_names_out(self, X=None):
        return self.feature_names_out

# Custom ColumnTransformer that returns a DataFrame instead of a numpy array
class DataFrameColumnTransformer(ColumnTransformer):
    def __init__(self, transformers, remainder='passthrough'):
        super().__init__(transformers, remainder=remainder)

    def transform(self, X):
        X_transformed = super().transform(X)
        feature_names_out = self.get_feature_names_out()
        return pd.DataFrame(X_transformed, columns=feature_names_out)

    def fit_transform(self, X, y=None):
        X_transformed = super().fit_transform(X, y)
        feature_names_out = self.get_feature_names_out()
        return pd.DataFrame(X_transformed, columns=feature_names_out)

# Custom transformer to remove duplicate columns
class DuplicatesRemover(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        X_df = pd.DataFrame(X).copy()
        self.original_columns_ = X_df.columns.tolist()
        X_df = X_df.T.drop_duplicates().T
        self.columns_to_keep_ = X_df.columns.tolist()
        self.dropped_columns_ = [col for col in self.original_columns_ if col not in self.columns_to_keep_]
        return self
    
    def transform(self, X):
        X_df = pd.DataFrame(X, columns=self.original_columns_)
        return X_df[self.columns_to_keep_]

    def get_feature_names_out(self, input_features=None):
        return self.columns_to_keep_

# Define a pipeline for power transformation and scaling
power_scale_transformer = Pipeline([
    ('pwr', PowerTransformer(method='box-cox')),
    ('scale', StandardScaler())
])

# Define a preprocessor to apply transformations to specified columns
preprocessor = ColumnTransformer(
    transformers=[
        ('power', power_scale_transformer, num_area_columns),
        ('bin5', KBinsDiscretizer(n_bins=5, encode='ordinal', strategy='kmeans', subsample=None, random_state=RANDOM_STATE), kbins_columns),
        ('add', FeatureAdder(), num_geo_columns),
        ('scale', StandardScaler(), kbins_columns)
    ], 
    remainder='passthrough'
)

# Set the output of the preprocessor to be a DataFrame
preprocessor.set_output(transform='pandas')

# Define a feature generator to apply AutoFeat and polynomial feature transformations
feature_generator = ColumnTransformer(
    transformers=[
        ('auto_feat', AutoFeatWrapper(
            AutoFeatRegressor(
                categorical_cols=autofeat_cat_columns, 
                feateng_cols=autofeat_feateng_columns, 
                verbose=0, 
                feateng_steps=1, 
                n_jobs=-1
            )
        ), autofeat_columns),
        ('poly_features', PolynomialFeatures(
            degree=2, interaction_only=False, 
            include_bias=False
        ), polyfeat_columns)
    ], 
    remainder='passthrough'
)

# Set the output of the feature generator to be a DataFrame
feature_generator.set_output(transform='pandas');


In [7]:
# Custom implementation of SequentialFeatureSelector to handle batch processing and customized output
class CustomSequentialFeatureSelector(SequentialFeatureSelector):
    
    def __init__(
            self, 
            estimator, 
            k_features, 
            forward=True, 
            floating=False, 
            scoring=None, 
            cv=0,
            n_jobs=1, 
            pre_dispatch='2*n_jobs', 
            clone_estimator=True, 
            verbose=0, 
            batch=None
            ):
        self.batch = batch
        super().__init__(
            estimator=estimator, 
            k_features=k_features, 
            forward=forward, 
            floating=floating,
            scoring=scoring, 
            cv=cv, 
            n_jobs=n_jobs, 
            pre_dispatch=pre_dispatch, 
            clone_estimator=clone_estimator, 
            verbose=verbose
            )

    def fit_transform(self, X, y=None, **fit_params):
        """
        Fit the model with optional batch processing and transform the data.
        """
        X_full = X.copy()
        if self.batch is not None:
            print('Using batch: ', self.batch)
            X = X[:self.batch]
            y = y[:self.batch]
        self.fit(X, y, **fit_params)
        self.selected_features_ = list(self.k_feature_names_)
        selected_data = self.transform(X_full)
        return pd.DataFrame(selected_data, columns=self.get_feature_names_out(X_full.columns))

    def get_feature_names_out(self, input_features=None):
        """
        Return the names of the selected features.
        """
        return self.selected_features_

    def set_output(self, transform=None):
        """
        Placeholder method to comply with scikit-learn's API.
        """
        pass

# Define a custom scorer using root mean squared error
scorer = make_scorer(root_mean_squared_error, greater_is_better=False)

# Define forward and backward feature selectors using the custom sequential feature selector
sfs_forward = CustomSequentialFeatureSelector(
    estimator=Ridge(), 
    k_features=10, 
    forward=True, 
    scoring=scorer, 
    verbose=0, 
    cv=3, 
    n_jobs=1
) 

sfs_backward = CustomSequentialFeatureSelector(
    estimator=Ridge(), 
    k_features=10, 
    forward=False,
    scoring=scorer, 
    verbose=0, 
    cv=0, 
    n_jobs=1, 
    batch=5000
)

# Combine the forward and backward feature selectors into a feature union
feature_union = FeatureUnion([
    ('sfs_forward', sfs_forward),
    ('sfs_backward', sfs_backward)
])

# Set the output of the feature union to be a DataFrame and enable verbose feature names
feature_union.set_output(transform='pandas')
feature_union.set_params(verbose_feature_names_out=True)

# Define the machine learning pipeline
pipeline = Pipeline([
    (
        'processor', Pipeline([
            ('preprocessor', preprocessor),  # Apply the preprocessor transformations
            ('feature_generator', feature_generator),  # Generate additional features
            ('drop_duplicates_1', DuplicatesRemover()),  # Remove duplicate columns
            ('feature_union', feature_union),  # Apply feature selection
            ('drop_duplicates_2', DuplicatesRemover()),  # Remove duplicate columns again
        ])
    ), 
    (
        'regressor', CatBoostRegressor(
            iterations=784, 
            learning_rate=0.0933769458215897, 
            depth=9, 
            l2_leaf_reg=6.25803192908997,
            loss_function='RMSE',
            verbose=0,
            random_state=RANDOM_STATE
        )
    )  # Train a CatBoostRegressor with the specified hyperparameters
])


In [8]:
%%time

# Train the pipeline with the training data, ignoring future warnings
with warnings.catch_warnings():
    warnings.simplefilter('ignore', category=FutureWarning)
    pipeline.fit(X_train, y_train)

# Calculate performance metrics for the pipeline using the test data
metrics = get_metrics(pipeline, X_train, y_train, X_test, y_test, need_fit=False)

# Print the R^2 score of the pipeline
print(f"{metrics['r2']=}")

# Print a test prediction using a sample from the training data
print(f"Тестовое предсказание: {pipeline.predict(X_train.sample())}")

# Print the number of features used by the regressor in the pipeline
print(f"Количество признаков: {len(pipeline['regressor'].feature_names_)}")

# Print the names of the features used by the regressor
pprint(pipeline['regressor'].feature_names_)


Using batch:  5000
metrics['r2']=0.8839572763738286
Тестовое предсказание: [18163527.28321603]
Количество признаков: 15
['sfs_forward__auto_feat__power__total_area',
 'sfs_forward__auto_feat__add__scale__distance',
 'sfs_forward__auto_feat__exp(power__total_area)',
 'sfs_forward__auto_feat__Abs(power__total_area)',
 'sfs_forward__auto_feat__Abs(add__scale__distance)',
 'sfs_forward__poly_features__power__living_area add__scale__distance',
 'sfs_forward__poly_features__power__total_area scale__ceiling_height',
 'sfs_forward__poly_features__add__scale__distance scale__ceiling_height',
 'sfs_forward__poly_features__scale__ceiling_height scale__floors_total',
 'sfs_forward__poly_features__scale__ceiling_height scale__building_id',
 'sfs_backward__auto_feat__power__total_area**3',
 'sfs_backward__auto_feat__Abs(scale__longitude)',
 'sfs_backward__poly_features__power__total_area^2',
 'sfs_backward__poly_features__scale__ceiling_height scale__build_year',
 'sfs_backward__poly_features__scale

На предыдущем спринте эта модель была залогирована через MLFlow в object storage 
```
RUN_NAME = "6 этап: сохранение окончательной версии модели"
REGISTRY_MODEL_NAME = 'model_sprint_2_stage_6'

experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id
model = pipeline

with open("selected_features.txt", "w") as f:
    for line in list(feature_names):
        f.write(f"{line}\n")

with mlflow.start_run(run_name=RUN_NAME, experiment_id=experiment_id) as run:
    mlflow.sklearn.log_model(sk_model=model, 
        artifact_path='models', 
        registered_model_name=REGISTRY_MODEL_NAME, 
        signature=mlflow.models.infer_signature(X_test, model.predict(X_test)), 
        input_example = X_test[:10], 
        await_registration_for=60, 
        pip_requirements='../requirements.txt')
    mlflow.log_params(model.get_params())
    mlflow.log_artifact('selected_features.txt', "artifacts") 
    mlflow.log_metrics(metrics)
```

In [9]:
# Define the local path to save the downloaded model
local_model_path = 'loaded_model.pkl'

# Create an S3 client using the AWS credentials and S3 endpoint from environment variables
s3_client = boto3.client(
    's3',
    aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
    endpoint_url=os.environ['MLFLOW_S3_ENDPOINT_URL']
)

# Download the model file from the S3 bucket to the local path
s3_client.download_file(os.getenv('S3_BUCKET_NAME'), os.getenv('MODEL_FILE_KEY'), local_model_path)

# Load the model from the local file
model = joblib.load(local_model_path)

# Print a message indicating successful model loading with details about the model
print(f"Модель {model.__class__.__module__}.{model.__class__.__name__}."
      f"{model._final_estimator} успешно загружена в файл: {local_model_path}")

# Print a test prediction using a sample from the test data
print(f"Тестовое предсказание: {pipeline.predict(X_test.sample())=}")


Модель sklearn.pipeline.Pipeline.<catboost.core.CatBoostRegressor object at 0x7f5e7c4ff8b0> успешно загружена в файл: loaded_model.pkl
Тестовое предсказание: pipeline.predict(X_test.sample())=array([8299759.40921049])
