diff --git a/src/test.py b/src/test.py index 16a429a..169971b 100644 --- a/src/test.py +++ b/src/test.py @@ -1,90 +1,46 @@ import numpy as np -import matplotlib.pyplot as plt - -from thefittest.optimizers import SelfCGP -from thefittest.optimizers import SHADE -from thefittest.benchmarks import BanknoteDataset, IrisDataset -from thefittest.classifiers._gpnnclassifier import GeneticProgrammingNeuralNetClassifier2 -from thefittest.regressors._gpnnregression import GeneticProgrammingNeuralNetRegressor2 -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import minmax_scale -from sklearn.metrics import confusion_matrix -from sklearn.metrics import f1_score, r2_score -from sklearn.utils.estimator_checks import check_estimator - - -# data = IrisDataset() -# X = data.get_X() -# y = data.get_y() - -# X_scaled = minmax_scale(X) - -# X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3) -# model = GeneticProgrammingNeuralNetClassifier2( -# n_iter=10, -# pop_size=50, -# optimizer=SelfCGP, -# optimizer_args={ -# "show_progress_each": 1, -# # "n_jobs": 1 -# }, -# weights_optimizer=SHADE, -# weights_optimizer_args={ -# "iters": 100, -# "pop_size": 100, -# }, -# ) - -# # check_estimator(model) - - -# import time - - -# begin = time.time() -# model.fit(X_train, y_train) -# print(time.time() - begin) +from thefittest.regressors import GeneticProgrammingRegressor +from thefittest.optimizers import GeneticProgramming, SelfCGP +from sklearn.metrics import f1_score, r2_score -# predict = model.predict(X_test) +# from thefittest.benchmarks import BanknoteDataset +from collections import defaultdict +import matplotlib.pyplot as plt -# print("confusion_matrix: \n", confusion_matrix(y_test, predict)) -# print("f1_score: \n", f1_score(y_test, predict, average="macro")) +from sklearn.utils.estimator_checks import check_estimator +from sklearn.datasets import load_diabetes -# def problem(x): -# return np.sin(x[:, 0]) +def problem(x): + return np.sin(x[:, 0]) -# function = problem -# left_border = -4.5 -# right_border = 4.5 -# sample_size = 300 -# n_dimension = 1 +data = load_diabetes() -# X = np.array([np.linspace(left_border, right_border, sample_size) for _ in range(n_dimension)]).T -# y = function(X) -# X_scaled = minmax_scale(X) -# y_scaled = minmax_scale(y) +X = data.data +y = data.target -# X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.33) -# model = GeneticProgrammingNeuralNetRegressor2( -# n_iter=5, -# pop_size=15, -# optimizer=SelfCGP, -# optimizer_args={"show_progress_each": 1, "n_jobs": 2}, -# weights_optimizer=SHADE, -# weights_optimizer_args={"iters": 100, "pop_size": 100}, -# ) +number_of_iterations = 200 +model = GeneticProgrammingRegressor( + n_iter=number_of_iterations, + pop_size=500, + optimizer=SelfCGP, + optimizer_args={ + "keep_history": True, + "show_progress_each": 10, + "elitism": True, + }, +) -# # check_estimator(model) +check_estimator(model) -# model.fit(X_train, y_train) +# model.fit(X, y) -# predict = model.predict(X_test) +# predict = model.predict(X) -# print("coefficient_determination: \n", r2_score(y_test, predict)) +# print(r2_score(y, predict)) diff --git a/src/thefittest/base/_gp.py b/src/thefittest/base/_gp.py new file mode 100644 index 0000000..aadcdfd --- /dev/null +++ b/src/thefittest/base/_gp.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +from abc import ABCMeta, abstractmethod +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional +from typing import Tuple +from typing import Type +from typing import Union + +import numpy as np +from numpy.typing import ArrayLike +from numpy.typing import NDArray + +from sklearn.base import BaseEstimator +from sklearn.base import ClassifierMixin +from ..base._tree import init_symbolic_regression_uniset +from sklearn.preprocessing import LabelEncoder +from sklearn.preprocessing import MinMaxScaler +from sklearn.utils.multiclass import check_classification_targets +from sklearn.utils.validation import check_array +from sklearn.utils.validation import check_is_fitted + + +from ..optimizers import GeneticProgramming + +from ..base import UniversalSet +from ..optimizers import SelfCGP +from ..utils._metrics import categorical_crossentropy3d +from ..utils._metrics import root_mean_square_error2d +from ..utils._metrics import coefficient_determination +from ..utils.random import check_random_state +from ..utils.random import randint +from ..utils.random import uniform +from ..utils import array_like_to_numpy_X_y + + +def fitness_function(trees: NDArray, y: NDArray[np.float64]) -> NDArray[np.float64]: + fitness = [] + for tree in trees: + y_pred = tree() * np.ones(len(y)) + fitness.append(coefficient_determination(y, y_pred)) + return np.array(fitness, dtype=np.float64) + + +class BaseGP(BaseEstimator, metaclass=ABCMeta): + + @abstractmethod + def __init__( + self, + *, + n_iter: int = 50, + pop_size: int = 500, + uniset: Optional[UniversalSet] = None, + optimizer: Union[Type[SelfCGP], Type[GeneticProgramming]] = SelfCGP, + optimizer_args: Optional[dict[str, Any]] = None, + random_state: Optional[Union[int, np.random.RandomState]] = None, + ): + self.n_iter = n_iter + self.pop_size = pop_size + self.uniset = uniset + self.optimizer = optimizer + self.optimizer_args = optimizer_args + self.random_state = random_state + + def get_optimizer( + self, + ) -> Union[ + GeneticProgramming, + SelfCGP, + ]: + return self._optimizer + + def generator1(self) -> float: + value = np.round(uniform(0, 10, 1)[0], 4) + return value + + def generator2(self) -> int: + value = randint(0, 10, 1)[0] + return value + + def check_optimizer_args(self) -> dict: + if self.optimizer_args is None: + optimizer_args = {} + else: + optimizer_args = self.optimizer_args.copy() + for arg in ( + "iters", + "uniset", + "pop_size", + ): + assert ( + arg not in optimizer_args + ), f"Do not set '{arg}' in 'optimizer_args'. Instead, use the arguments of the class." + for arg in ( + "fitness_function", + "fitness_function_args", + "genotype_to_phenotype", + "genotype_to_phenotype_args", + "minimization", + "init_population", + "optimal_value", + ): + assert ( + arg not in optimizer_args + ), f"Do not set '{arg}' to 'optimizer_args'. It is defined automatically." + + return optimizer_args + + def fit(self, X: ArrayLike, y: ArrayLike): + + optimizer_args = self.check_optimizer_args() + check_random_state(self.random_state) + + if isinstance(self, ClassifierMixin): + pass + else: + X, y = self._validate_data(X, y, y_numeric=True, reset=True) + + X, y = array_like_to_numpy_X_y(X, y) + + # в отдельную функцию + if self.uniset is None: + uniset = init_symbolic_regression_uniset( + X, ephemeral_node_generators=(self.generator1, self.generator2) + ) + else: + uniset = self.uniset + + optimizer_args["iters"] = self.n_iter + optimizer_args["pop_size"] = self.pop_size + optimizer_args["uniset"] = uniset + + if isinstance(self, ClassifierMixin): + pass + + else: + optimizer_args["fitness_function"] = fitness_function + optimizer_args["fitness_function_args"] = {"y": y} + + self.trained_optimizer_ = self.optimizer(**optimizer_args) + self.trained_optimizer_.fit() + + self.tree_ = self.trained_optimizer_.get_fittest()["phenotype"] + + return self + + def predict(self, X: NDArray[np.float64]): + + check_is_fitted(self) + + X = check_array(X) + n_features = X.shape[1] + + if self.n_features_in_ != n_features: + raise ValueError( + "Number of features of the model must match the " + f"input. Model n_features is {self.n_features_in_} and input " + f"n_features is {n_features}." + ) + + tree_for_predict = self.tree_.set_terminals(**{f"x{i}": X[:, i] for i in range(n_features)}) + + y_predict = tree_for_predict() * np.ones(len(X)) + + return y_predict diff --git a/src/thefittest/base/_gpnn.py b/src/thefittest/base/_gpnn.py index f7a4c21..29d7f5b 100644 --- a/src/thefittest/base/_gpnn.py +++ b/src/thefittest/base/_gpnn.py @@ -39,35 +39,7 @@ from ..utils._metrics import categorical_crossentropy3d from ..utils._metrics import root_mean_square_error2d from ..utils.random import check_random_state - - -class Model: - def _fit( - self, - X: np.typing.NDArray[np.float64], - y: NDArray[Union[np.float64, np.int64]], - ) -> Any: - pass - - def _predict(self, X: NDArray[np.float64]) -> Any: - pass - - def get_optimizer( - self: Model, - ) -> Any: - pass - - def fit( - self, - X: NDArray[np.float64], - y: NDArray[Union[np.float64, np.int64]], - ) -> Any: - assert np.all(np.isfinite(X)) - assert np.all(np.isfinite(y)) - return self._fit(X, y) - - def predict(self, X: NDArray[np.float64]) -> NDArray[Union[np.float64, np.int64]]: - return self._predict(X) +from ..utils import array_like_to_numpy_X_y def fitness_function_structure( @@ -271,13 +243,6 @@ def __init__( self.net_size_penalty = net_size_penalty self.random_state = random_state - def array_like_to_numpy_X_y( - self, X: ArrayLike, y: ArrayLike - ) -> Tuple[NDArray[np.float64], NDArray[np.int64]]: - X = np.array(X, dtype=np.float64) - y = np.array(y, dtype=np.float64) - return X, y - def get_net(self) -> Net: return self.net_ @@ -382,7 +347,7 @@ def fit(self, X: ArrayLike, y: ArrayLike): y = self._target_scaler.fit_transform(y.reshape(-1, 1))[:, 0] - X, y = self.array_like_to_numpy_X_y(X, y) + X, y = array_like_to_numpy_X_y(X, y) if self.offset: X = np.hstack([X, np.ones((X.shape[0], 1))]) @@ -428,7 +393,6 @@ def predict(self, X: NDArray[np.float64]): check_is_fitted(self) X = check_array(X) - self._validate_data n_features = X.shape[1] if self.n_features_in_ != n_features: diff --git a/src/thefittest/base/_mlp.py b/src/thefittest/base/_mlp.py index f3fa693..5edda8c 100644 --- a/src/thefittest/base/_mlp.py +++ b/src/thefittest/base/_mlp.py @@ -36,6 +36,7 @@ from ..utils._metrics import root_mean_square_error2d from ..utils.random import check_random_state from ..utils.transformations import GrayCode +from ..utils import array_like_to_numpy_X_y weights_type_optimizer_alias = Union[ @@ -49,35 +50,6 @@ weights_optimizer_alias = Union[DifferentialEvolution, jDE, SHADE, GeneticAlgorithm, SelfCGA, SHAGA] -class Model: - def _fit( - self, - X: np.typing.NDArray[np.float64], - y: NDArray[Union[np.float64, np.int64]], - ) -> Any: - pass - - def _predict(self, X: NDArray[np.float64]) -> Any: - pass - - def get_optimizer( - self: Model, - ) -> Any: - pass - - def fit( - self, - X: NDArray[np.float64], - y: NDArray[Union[np.float64, np.int64]], - ) -> Any: - assert np.all(np.isfinite(X)) - assert np.all(np.isfinite(y)) - return self._fit(X, y) - - def predict(self, X: NDArray[np.float64]) -> NDArray[Union[np.float64, np.int64]]: - return self._predict(X) - - def fitness_function_weights( weights: NDArray[np.float64], net: "Net", @@ -248,13 +220,6 @@ def _defitne_net(self, n_inputs: int, n_outputs: int) -> Net: net._offset = self.offset return net - def array_like_to_numpy_X_y( - self, X: ArrayLike, y: ArrayLike - ) -> Tuple[NDArray[np.float64], NDArray[np.int64]]: - X = np.array(X, dtype=np.float64) - y = np.array(y, dtype=np.float64) - return X, y - def get_optimizer( self, ) -> Union[ @@ -321,7 +286,7 @@ def fit(self, X: ArrayLike, y: ArrayLike): y = self._target_scaler.fit_transform(y.reshape(-1, 1))[:, 0] - X, y = self.array_like_to_numpy_X_y(X, y) + X, y = array_like_to_numpy_X_y(X, y) if self.offset: X = np.hstack([X, np.ones((X.shape[0], 1))]) @@ -356,7 +321,6 @@ def predict(self, X: NDArray[np.float64]): check_is_fitted(self) X = check_array(X) - self._validate_data n_features = X.shape[1] if self.n_features_in_ != n_features: diff --git a/src/thefittest/regressors/__init__.py b/src/thefittest/regressors/__init__.py index 7eb9917..3bdf1f3 100644 --- a/src/thefittest/regressors/__init__.py +++ b/src/thefittest/regressors/__init__.py @@ -1,6 +1,6 @@ from ._gpnnregression import GeneticProgrammingNeuralNetRegressor from ._mlpearegressor import MLPEARegressor -from ._symbolicregressiongp import SymbolicRegressionGP +from ._gpregressor import GeneticProgrammingRegressor __all__ = ["SymbolicRegressionGP", "GeneticProgrammingNeuralNetRegressor", "MLPEARegressor"] diff --git a/src/thefittest/regressors/_gpregressor.py b/src/thefittest/regressors/_gpregressor.py new file mode 100644 index 0000000..6f4c911 --- /dev/null +++ b/src/thefittest/regressors/_gpregressor.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from typing import Any +from typing import Optional +from typing import Type +from typing import Union + +import numpy as np + +from sklearn.base import RegressorMixin + +from ..base import UniversalSet +from ..base._gp import BaseGP +from ..optimizers import GeneticProgramming +from ..optimizers import SelfCGP + + +class GeneticProgrammingRegressor(RegressorMixin, BaseGP): + def __init__( + self, + *, + n_iter: int = 50, + pop_size: int = 500, + uniset: Optional[UniversalSet] = None, + optimizer: Union[Type[SelfCGP], Type[GeneticProgramming]] = SelfCGP, + optimizer_args: Optional[dict[str, Any]] = None, + random_state: Optional[Union[int, np.random.RandomState]] = None, + ): + super().__init__( + n_iter=n_iter, + pop_size=pop_size, + uniset=uniset, + optimizer=optimizer, + optimizer_args=optimizer_args, + random_state=random_state, + ) diff --git a/src/thefittest/regressors/_symbolicregressiongp.py b/src/thefittest/regressors/_symbolicregressiongp.py deleted file mode 100644 index b844102..0000000 --- a/src/thefittest/regressors/_symbolicregressiongp.py +++ /dev/null @@ -1,130 +0,0 @@ -from __future__ import annotations - -from typing import Any -from typing import Optional -from typing import Type -from typing import Union - -import numpy as np -from numpy.typing import NDArray - -from ..base import UniversalSet -from ..base._mlp import Model -from ..base._tree import init_symbolic_regression_uniset -from ..optimizers import DifferentialEvolution -from ..optimizers import GeneticAlgorithm -from ..optimizers import GeneticProgramming -from ..optimizers import SHADE -from ..optimizers import SHAGA -from ..optimizers import SelfCGA -from ..optimizers import SelfCGP -from ..optimizers import jDE -from ..utils._metrics import coefficient_determination - - -def fitness_function(trees: NDArray, y: NDArray[np.float64]) -> NDArray[np.float64]: - fitness = [] - for tree in trees: - y_pred = tree() * np.ones(len(y)) - fitness.append(coefficient_determination(y, y_pred)) - return np.array(fitness, dtype=np.float64) - - -def generator1() -> float: - value = np.round(np.random.uniform(0, 10), 4) - return value - - -def generator2() -> int: - value = np.random.randint(0, 10) - return value - - -class SymbolicRegressionGP(Model): - def __init__( - self, - iters: int, - pop_size: int, - uniset: Optional[UniversalSet] = None, - optimizer: Union[Type[SelfCGP], Type[GeneticProgramming]] = SelfCGP, - optimizer_args: Optional[dict[str, Any]] = None, - ): - Model.__init__(self) - - self._iters: int = iters - self._pop_size: int = pop_size - self._uniset: Optional[UniversalSet] = uniset - self._optimizer_args: Optional[dict[str, Any]] = optimizer_args - self._optimizer_class: Union[Type[SelfCGP], Type[GeneticProgramming]] = optimizer - self._optimizer: Union[SelfCGP, GeneticProgramming] - - def get_optimizer( - self: SymbolicRegressionGP, - ) -> Union[ - DifferentialEvolution, - GeneticAlgorithm, - GeneticProgramming, - jDE, - SelfCGA, - SelfCGP, - SHADE, - SHAGA, - ]: - return self._optimizer - - def _fit( - self: SymbolicRegressionGP, X: NDArray[np.float64], y: NDArray[Union[np.float64, np.int64]] - ) -> SymbolicRegressionGP: - optimizer_args: dict[str, Any] - uniset: UniversalSet - - if self._uniset is None: - uniset = init_symbolic_regression_uniset( - X, ephemeral_node_generators=(generator1, generator2) - ) - else: - uniset = self._uniset - - if self._optimizer_args is not None: - assert ( - "iters" not in self._optimizer_args.keys() - and "pop_size" not in self._optimizer_args.keys() - and "uniset" not in self._optimizer_args.keys() - ), """Do not set the "iters", "pop_size", or "uniset" in the "optimizer_args". Instead, - use the "SymbolicRegressionGP" arguments""" - assert ( - "fitness_function" not in self._optimizer_args.keys() - ), """Do not set the "fitness_function" - to the "optimizer_args". It is defined automatically""" - assert ( - "minimization" not in self._optimizer_args.keys() - ), """Do not set the "minimization" - to the "optimizer_args". It is defined automatically""" - optimizer_args = self._optimizer_args.copy() - - else: - optimizer_args = {} - - optimizer_args["fitness_function"] = fitness_function - optimizer_args["fitness_function_args"] = {"y": y} - optimizer_args["iters"] = self._iters - optimizer_args["pop_size"] = self._pop_size - optimizer_args["uniset"] = uniset - - self._optimizer = self._optimizer_class(**optimizer_args) - self._optimizer.fit() - - return self - - def _predict( - self: SymbolicRegressionGP, X: NDArray[np.float64] - ) -> NDArray[Union[np.float64, np.int64]]: - n_dimension = X.shape[1] - solution = self.get_optimizer().get_fittest() - - genotype_for_pred = solution["phenotype"].set_terminals( - **{f"x{i}": X[:, i] for i in range(n_dimension)} - ) - - y_pred = genotype_for_pred() * np.ones(len(X)) - return y_pred diff --git a/src/thefittest/tests/test_regressors.py b/src/thefittest/tests/test_regressors.py index 581ebd3..3fcdbd6 100644 --- a/src/thefittest/tests/test_regressors.py +++ b/src/thefittest/tests/test_regressors.py @@ -10,7 +10,7 @@ from ..optimizers import SelfCGP from ..regressors import GeneticProgrammingNeuralNetRegressor from ..regressors import MLPEARegressor -from ..regressors import SymbolicRegressionGP +from ..regressors import GeneticProgrammingRegressor from ..base._tree import Add from ..base._tree import Div from ..base._tree import Mul diff --git a/src/thefittest/utils/__init__.py b/src/thefittest/utils/__init__.py index d03087b..e48d763 100644 --- a/src/thefittest/utils/__init__.py +++ b/src/thefittest/utils/__init__.py @@ -10,6 +10,7 @@ from numba.types import List as numbaListType import numpy as np +from numpy.typing import ArrayLike from numpy.typing import NDArray @@ -273,7 +274,7 @@ def forward( return nodes -# @njit +@njit def forward2d( X: NDArray[np.float64], inputs: NDArray[np.int64], @@ -307,3 +308,11 @@ def numpy_group_by(group: NDArray, by: NDArray) -> Tuple: keys, cut_index = np.unique(by, return_index=True) groups = np.split(group, cut_index)[1:] return keys, groups + + +def array_like_to_numpy_X_y( + X: ArrayLike, y: ArrayLike +) -> Tuple[NDArray[np.float64], NDArray[np.int64]]: + X = np.array(X, dtype=np.float64) + y = np.array(y, dtype=np.float64) + return X, y diff --git a/src/thefittest/utils/_metrics.py b/src/thefittest/utils/_metrics.py index 9b5ddc5..2ec455b 100644 --- a/src/thefittest/utils/_metrics.py +++ b/src/thefittest/utils/_metrics.py @@ -37,6 +37,9 @@ def coefficient_determination( mean_y_true = np.mean(y_true) total_sum = np.sum((y_true - mean_y_true) ** 2) + if total_sum == 0: + total_sum = 1e-10 + error = y_true - y_predict residual_sum = np.sum((error) ** 2) r2 = 1 - residual_sum / total_sum