In [1]:
import warnings
from sklearn.exceptions import ConvergenceWarning

warnings.filterwarnings("ignore")
warnings.filterwarnings("ignore", category=ConvergenceWarning)

In [2]:
from abc import ABC, abstractmethod

import numpy as np
from sklearn.metrics import f1_score, make_scorer


class BestModelDetector(ABC):
    scorer = make_scorer(f1_score, average='weighted')
    X = None
    y = None
    clf = None

    @abstractmethod
    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        ...

    @abstractmethod
    def predict(self, X: np.ndarray) -> np.ndarray:
        ...

    @abstractmethod
    def print_best_model_info(self):
        ...


**Датасет**

*Признаки*:

1 - фиксированная кислотность\
2 - летучая кислотность\
3 - лимонная кислота\
4 - остаточный сахар\
5 - хлориды\
6 - свободный диоксид серы\
7 - общий диоксид серы\
8 - плотность\
9 - pH\
10 - сульфаты\
11 - алкоголь

*Результат*:\
Качество (int от 0 до 2)

Пример данных:

In [3]:
from sklearn.datasets import load_wine
import pandas as pd
import numpy as np

wine = load_wine()
df = pd.DataFrame(
    data=np.c_[wine['data'], wine['target']],
    columns=wine['feature_names'] + ['target']
)
df.head()

Unnamed: 0,alcohol,malic_acid,ash,alcalinity_of_ash,magnesium,total_phenols,flavanoids,nonflavanoid_phenols,proanthocyanins,color_intensity,hue,od280/od315_of_diluted_wines,proline,target
0,14.23,1.71,2.43,15.6,127.0,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065.0,0.0
1,13.2,1.78,2.14,11.2,100.0,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050.0,0.0
2,13.16,2.36,2.67,18.6,101.0,2.8,3.24,0.3,2.81,5.68,1.03,3.17,1185.0,0.0
3,14.37,1.95,2.5,16.8,113.0,3.85,3.49,0.24,2.18,7.8,0.86,3.45,1480.0,0.0
4,13.24,2.59,2.87,21.0,118.0,2.8,2.69,0.39,1.82,4.32,1.04,2.93,735.0,0.0


Делим на train, test, val

In [4]:
from sklearn.model_selection import train_test_split

X_train_full, X_test, y_train_full, y_test = train_test_split(wine.data, wine.target, train_size=0.8)
X_train, X_val, y_train, y_val = train_test_split(X_train_full, y_train_full, train_size=0.8)

Пайплан и f1-score для TPOT:

In [5]:
import numpy as np
from tpot import TPOTClassifier


class TPOTDetector(BestModelDetector):
    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        self.X = X
        self.y = y
        self.clf = TPOTClassifier(
            generations=5,
            population_size=20,
            verbosity=2,
            scoring=self.scorer,
        )

        self.clf.fit(X, y)

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.clf.predict(X)

    def print_best_model_info(self):
        return self.clf.fitted_pipeline_


In [7]:
tpot_clf = TPOTDetector()
tpot_clf.fit(X_train_full, y_train_full)
tpot_predicted = tpot_clf.predict(X_test)
f1_score(y_test, tpot_predicted, average="weighted")

Version 0.11.7 of tpot is outdated. Version 0.12.0 was released 1 day ago.


Optimization Progress:   0%|          | 0/120 [00:00<?, ?pipeline/s]


Generation 1 - Current best internal CV score: 0.9715079365079365

Generation 2 - Current best internal CV score: 0.9715079365079365

Generation 3 - Current best internal CV score: 0.9782952128219708

Generation 4 - Current best internal CV score: 0.9782952128219708

Generation 5 - Current best internal CV score: 0.9782952128219708

Best pipeline: ExtraTreesClassifier(input_matrix, bootstrap=False, criterion=entropy, max_features=0.55, min_samples_leaf=2, min_samples_split=11, n_estimators=100)


1.0

Напишем свою модель.
В качестве моделей будем использовать случайные леса, градиентный бустинг и многослойный перцептрон.
Для настройки параметров будем использовать Grid Search

In [8]:
import itertools

from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.metrics import f1_score, make_scorer
from sklearn.model_selection import cross_val_score
from sklearn.neural_network import MLPClassifier

class PurePythonModelDetector(BestModelDetector):
    def __init__(self) -> None:
        self.models = {
            "random_forest": RandomForestClassifier(),
            "gradient_boosting": GradientBoostingClassifier(),
            "mlp": MLPClassifier(),
        }

        self.param_space = {
            "random_forest": {
                "n_estimators": (5, 10, 25, 50, 100, 200),
                "max_depth": (1, 2, 3, 5, 10, 20),
            },
            "gradient_boosting": {
                "n_estimators": (5, 10, 25, 50, 100, 200),
                "learning_rate": (0.001, 0.01, 0.05, 0.1, 0.2),
            },
            "mlp": {
                "hidden_layer_sizes": (1, 5, 10, 25, 50, 100, 200, 500, 1000),
                "alpha": (0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.2),
            },
        }
        self.best_params = {}
        self.best_score = 0
        self.best_model = None

    def grid_search(self, model_name: str, space: dict, X: np.ndarray, y: np.ndarray) -> None:
        model = self.models[model_name]
        best_score = 0
        best_params = None

        for params in itertools.product(*space.values()):
            params_dict = dict(zip(space.keys(), params))
            model.set_params(**params_dict)

            scorer = make_scorer(f1_score, average="weighted")
            score = np.mean(cross_val_score(model, X, y, cv=5, scoring=scorer))
            if score > best_score:
                best_score = score
                best_params = params_dict

        self.best_params[model_name] = best_params
        model.set_params(**best_params)
        model.fit(X, y)

        if best_score > self.best_score:
            self.best_score = best_score
            self.best_model = model_name

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        for model_name in self.models.keys():
            space = self.param_space[model_name]
            self.grid_search(model_name, space, X, y)

    def print_best_model_info(self) -> None:
        print(
            "model",
            self.best_model,
            "params",
            self.best_params[self.best_model],
            "train_f1",
            self.best_score,
        )

    def predict(self, X: np.ndarray) -> np.ndarray:
        return self.models[self.best_model].predict(X)


In [11]:
%%time

my_automl = PurePythonModelDetector()
my_automl.fit(X_train_full, y_train_full)
my_automl.print_best_model_info()
pred = my_automl.predict(X_test)
print("f1 test", f1_score(y_test, pred, average='weighted'))

model random_forest params {'n_estimators': 200, 'max_depth': 5} train_f1 0.9711917047520762
f1 test 1.0
CPU times: user 1min 36s, sys: 0 ns, total: 1min 36s
Wall time: 1min 36s
