<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Тюнинг-оптюны-для-деревянных-моделей" data-toc-modified-id="Тюнинг-оптюны-для-деревянных-моделей-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Тюнинг оптюны для деревянных моделей</a></span></li><li><span><a href="#Тюнинг-оптюной-общий" data-toc-modified-id="Тюнинг-оптюной-общий-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Тюнинг оптюной общий</a></span></li><li><span><a href="#Проверка-на-адекватность" data-toc-modified-id="Проверка-на-адекватность-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Проверка на адекватность</a></span></li><li><span><a href="#Масштабирование-данных" data-toc-modified-id="Масштабирование-данных-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Масштабирование данных</a></span></li><li><span><a href="#Заполнение-пропусков" data-toc-modified-id="Заполнение-пропусков-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Заполнение пропусков</a></span></li><li><span><a href="#Визуализация-данных" data-toc-modified-id="Визуализация-данных-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Визуализация данных</a></span></li></ul></div>

# PIPELINE

А точнее набор несложных функций для удобной работы

In [1]:
# Импорт библиотек
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import time
import optuna
from optuna.integration import OptunaSearchCV

from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, MaxAbsScaler

from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestRegressor
from sklearn.dummy import DummyClassifier

from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.dummy import DummyRegressor

from catboost import CatBoostRegressor
from catboost import CatBoostClassifier

from lightgbm import LGBMRegressor
from lightgbm import LGBMClassifier

  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)
  _numeric_index_types = (pd.Int64Index, pd.Float64Index, pd.UInt64Index)


## Тюнинг оптюны для деревянных моделей
Параметры задавать необязательно. Поддерживаемые модели: DecisionTree, RandomForest, CatBoost, LGBM

In [2]:
# Функция для получения деревянной модели с наилучшими гиперпараметрами с помощью optuna
# На вход подаётся модель и тренировочные признаки
# На выходе получаем модель с найденными гиперпараметрами

def tuning_optuna_cv(model, X, y, params=None, cv=5, timeseries_cv=0, n_trials=10, scoring=None, random_state=None, get_time=False):
    """
    model - модель на вход
    X - признаки
    y - целевой признак
    params - словарь параметров (должен быть заточен под оптюновский интерфейс)
    cv_num - количество подвыборок для кросс-валидации
    timeseries_cv - для временных рядов, количество подвыборок для кросс-валидации
    n_trials - количество проводимых экспериментов для поиска гиперпараметров
    scoring - метрика, на основе которой будут подбираться гиперпараметры модели; если не указана, используется встроенная
                                        в модель метрика (см. https://scikit-learn.org/stable/modules/model_evaluation.html)
    get_time - получение затраченного времени на подбор гиперпараметров в минутах
    
    """
    opt_distr_int = optuna.distributions.IntUniformDistribution
    opt_distr_float = optuna.distributions.LogUniformDistribution
    
    if timeseries_cv >= 2:
        cv = TimeSeriesSplit(n_splits=timeseries_cv)
        
    if get_time == True:
        start_time = time.time()
    
    if (type(model).__name__ == 'DecisionTreeRegressor' or 
        type(model).__name__ == 'DecisionTreeClassifier'):
        
        param_distributions = {
            'min_samples_leaf':opt_distr_int(2,50),
            'min_samples_split':opt_distr_int(2,50),
            'max_depth':opt_distr_int(1,50)
        }
        
        if params != None:
            param_distributions = params
        
        optuna_search = OptunaSearchCV(
            model, param_distributions, cv=cv,
            n_trials=n_trials, scoring=scoring,
            random_state=random_state
        )
        optuna_search.fit(X, y)
    
        best_model = optuna_search.best_estimator_
        
        print(f'Подобранные параметры модели: {optuna_search.best_params_}\n')
        print(f'Полученная метрика: {optuna_search.best_score_}\n')
        
        if get_time == True:
            end_time = time.time() - start_time
            print(f'Затраченное время на подбор: {end_time/60:.2f} мин')
        
        return best_model
    
    if (type(model).__name__ == 'RandomForestRegressor' or 
        type(model).__name__ == 'RandomForestClassifier'):
        
        param_distributions = {
            'n_estimators':opt_distr_int(50,200),
            'min_samples_split':opt_distr_int(2,25),
            'max_depth':opt_distr_int(1,50),
            'min_samples_leaf':opt_distr_int(2,25)
        }
        
        if params != None:
            param_distributions = params
        
        optuna_search = OptunaSearchCV(
            model, param_distributions, cv=cv,
            n_trials=n_trials, scoring=scoring,
            random_state=random_state
        )
        optuna_search.fit(X, y)
    
        best_model = optuna_search.best_estimator_
        
        print(f'Подобранные параметры модели: {optuna_search.best_params_}\n')
        print(f'Полученная метрика: {optuna_search.best_score_}\n')
        
        if get_time == True:
            end_time = time.time() - start_time
            print(f'Затраченное время на подбор: {end_time/60:.2f} мин')
            
        return best_model
    
    if (type(model).__name__ == 'CatBoostRegressor' or 
        type(model).__name__ == 'CatBoostClassifier'):
        
        param_distributions = {
            'iterations':opt_distr_int(10,300),
            'learning_rate':opt_distr_float(0.01,1.0),
            'depth':opt_distr_int(1,10)
        }
        
        if params != None:
            param_distributions = params
        
        optuna_search = OptunaSearchCV(
            model, param_distributions, cv=cv,
            n_trials=n_trials, scoring=scoring,
            random_state=random_state
        )
        optuna_search.fit(X, y)
    
        best_model = optuna_search.best_estimator_
        
        print(f'Подобранные параметры модели: {optuna_search.best_params_}\n')
        print(f'Полученная метрика: {optuna_search.best_score_}\n')
        
        if get_time == True:
            end_time = time.time() - start_time
            print(f'Затраченное время на подбор: {end_time/60:.2f} мин')
            
        return best_model
    
    if (type(model).__name__ == 'LGBMRegressor' or 
        type(model).__name__ == 'LGBMClassifier'):
        
        param_distributions = {
            'n_estimators':opt_distr_int(10,300),
            'learning_rate':opt_distr_float(0.01,1.0),
            'max_depth':opt_distr_int(1,16)
        }
        
        if params != None:
            param_distributions = params
        
        optuna_search = OptunaSearchCV(
            model, param_distributions, cv=cv,
            n_trials=n_trials, scoring=scoring,
            random_state=random_state
        )
        optuna_search.fit(X, y)
    
        best_model = optuna_search.best_estimator_
        
        print(f'Подобранные параметры модели: {optuna_search.best_params_}\n')
        print(f'Полученная метрика: {optuna_search.best_score_}\n')
        
        if get_time == True:
            end_time = time.time() - start_time
            print(f'Затраченное время на подбор: {end_time/60:.2f} мин')
            
        return best_model
    
    else:
        print('Модель не поддерживается! Попробуйте другую')

## Тюнинг оптюной общий

In [3]:
# Функция для получения модели с наилучшими гиперпараметрами с помощью optuna
# На вход подаётся модель и признаки тренировочной выборки
# На выходе получаем модель с найденными гиперпараметрами

def tuning_optuna_cv(model, X, y, params, cv=5, timeseries_cv=0, n_trials=10, scoring=None, random_state=None, get_time=False):
    
    """
    model - модель на вход
    X - признаки
    y - целевой признак
    params - словарь параметров (должен быть заточен под оптюновский интерфейс)
    cv_num - количество подвыборок для кросс-валидации
    timeseries_cv - для временных рядов, количество подвыборок для кросс-валидации
    n_trials - количество проводимых экспериментов для поиска гиперпараметров
    scoring - метрика, на основе которой будут подбираться гиперпараметры модели; если не указана, используется встроенная
                                        в модель метрика (см. https://scikit-learn.org/stable/modules/model_evaluation.html)
    get_time - получение затраченного времени на подбор гипепараметров в минутах
    """
    
    if timeseries_cv >= 2:
        cv = TimeSeriesSplit(n_splits=timeseries_cv)
        
    if get_time == True:
        start_time = time.time()
    
    optuna_search = OptunaSearchCV(model, params, cv=cv, n_trials=n_trials, scoring=scoring, random_state=random_state)
    optuna_search.fit(X, y)
    
    best_model = optuna_search.best_estimator_
        
    print(f'Подобранные параметры модели: {optuna_search.best_params_}\n')
    print(f'Полученная метрика: {optuna_search.best_score_}\n')
        
    if get_time == True:
        end_time = time.time() - start_time
        print(f'Затраченное время на подбор: {end_time/60:.2f} мин')
        
    return best_model

## Проверка на адекватность

In [4]:
def dummytest(X_train, X_test, y_train, y_test, scorer, regressor=True):
    
    for strat in ['mean', 'median', 'constant']:
        
        if strat == 'constant':
            # Используем моду при стратегии constant
            if regressor == True:
                model = DummyRegressor(strategy=strat, constant=y_train.mode())
                strat='mode'
            else:
                model = DummyClassifier(strategy=strat, constant=y_train.mode())
                strat='mode'
                
        else:
            if regressor == True:
                model = DummyRegressor(strategy=strat)
            else:
                model = DummyClassifier(strategy=strat)
            
        model.fit(X_train, y_train)
        predictions = model.predict(X_test)
        score = scorer(y_test, predictions)
        print(f'Метрика модели пустышки при использовании стратегии {strat}: {score:.3f}')

## Масштабирование данных

In [5]:
# Простенькая функция для масштабирования данных
def EZScaler(X_train, X_test, X_valid=None, i=0):
    """
    i=0 - StandardScaler
    i=1 - MinMaxScaler
    i=2 - RobustScaler
    i=3 - MaxAbsScaler
    """
    if i == 0:
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        if X_valid != None:
            X_valid_scaled = scaler.transform(X_valid)
            return X_train_scaled, X_test_scaled, X_valid_scaled
        
        return X_train_scaled, X_test_scaled
    
    if i == 1:
        scaler = MinMaxScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        if X_valid != None:
            X_valid_scaled = scaler.transform(X_valid)
            return X_train_scaled, X_test_scaled, X_valid_scaled
        
        return X_train_scaled, X_test_scaled
    
    if i == 2:
        scaler = RobustScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        if X_valid != None:
            X_valid_scaled = scaler.transform(X_valid)
            return X_train_scaled, X_test_scaled, X_valid_scaled
        
        return X_train_scaled, X_test_scaled
    
    if i == 3:
        scaler = MaxAbsScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        if X_valid != None:
            X_valid_scaled = scaler.transform(X_valid)
            return X_train_scaled, X_test_scaled, X_valid_scaled
        
        return X_train_scaled, X_test_scaled

## Заполнение пропусков

In [6]:
# Функция заполняет NaN в столбцах на основе группировки по определённому столбцу
# Так заполнение происходит не единым значением, а с учётом особенностей другого столбца,
# который может содержать дополнительную информацию
def fillna_based_on_col(df, columns_to_fill, col_base, fill_type='median'):
    """
    columns_to_fill - столбцы, в которых заполняется NaN
    column_base - столбец, на основе которого будут заполняться NaN
    fill_method - способ заполнения
    """
    new_df = df.copy()
    for col in columns_to_fill:
        new_df.loc[:, col] = new_df.loc[:, col].fillna(data.groupby(col_base)[col].transform(fill_type))
    return new_df

## Визуализация данных

In [7]:
# Визуализация важности фичей модели
def feat_importance_plot(model, feature_names=None, figsize=(16,6)):
    feature_importances = pd.Series(model.feature_importances_, index=feature_names).sort_values()
    feature_importances.plot(kind='bar', grid=True, figsize=figsize)
    plt.title(f'Важность признаков на модели {type(model).__name__}')
    plt.ylabel('Importance')
    plt.show()