Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
sherstpasha committed Mar 12, 2024
1 parent dc7b788 commit a46128a
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 281 deletions.
100 changes: 28 additions & 72 deletions src/test.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,46 @@
import numpy as np
import matplotlib.pyplot as plt

from thefittest.optimizers import SelfCGP
from thefittest.optimizers import SHADE
from thefittest.benchmarks import BanknoteDataset, IrisDataset
from thefittest.classifiers._gpnnclassifier import GeneticProgrammingNeuralNetClassifier2
from thefittest.regressors._gpnnregression import GeneticProgrammingNeuralNetRegressor2
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import minmax_scale
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score, r2_score
from sklearn.utils.estimator_checks import check_estimator


# data = IrisDataset()
# X = data.get_X()
# y = data.get_y()

# X_scaled = minmax_scale(X)

# X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3)

# model = GeneticProgrammingNeuralNetClassifier2(
# n_iter=10,
# pop_size=50,
# optimizer=SelfCGP,
# optimizer_args={
# "show_progress_each": 1,
# # "n_jobs": 1
# },
# weights_optimizer=SHADE,
# weights_optimizer_args={
# "iters": 100,
# "pop_size": 100,
# },
# )

# # check_estimator(model)


# import time


# begin = time.time()
# model.fit(X_train, y_train)
# print(time.time() - begin)
from thefittest.regressors import GeneticProgrammingRegressor
from thefittest.optimizers import GeneticProgramming, SelfCGP

from sklearn.metrics import f1_score, r2_score

# predict = model.predict(X_test)
# from thefittest.benchmarks import BanknoteDataset
from collections import defaultdict

import matplotlib.pyplot as plt

# print("confusion_matrix: \n", confusion_matrix(y_test, predict))
# print("f1_score: \n", f1_score(y_test, predict, average="macro"))
from sklearn.utils.estimator_checks import check_estimator
from sklearn.datasets import load_diabetes


# def problem(x):
# return np.sin(x[:, 0])
def problem(x):
return np.sin(x[:, 0])


# function = problem
# left_border = -4.5
# right_border = 4.5
# sample_size = 300
# n_dimension = 1
data = load_diabetes()

# X = np.array([np.linspace(left_border, right_border, sample_size) for _ in range(n_dimension)]).T
# y = function(X)
# X_scaled = minmax_scale(X)
# y_scaled = minmax_scale(y)
X = data.data
y = data.target

# X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_scaled, test_size=0.33)

# model = GeneticProgrammingNeuralNetRegressor2(
# n_iter=5,
# pop_size=15,
# optimizer=SelfCGP,
# optimizer_args={"show_progress_each": 1, "n_jobs": 2},
# weights_optimizer=SHADE,
# weights_optimizer_args={"iters": 100, "pop_size": 100},
# )
number_of_iterations = 200

model = GeneticProgrammingRegressor(
n_iter=number_of_iterations,
pop_size=500,
optimizer=SelfCGP,
optimizer_args={
"keep_history": True,
"show_progress_each": 10,
"elitism": True,
},
)

# # check_estimator(model)
check_estimator(model)

# model.fit(X_train, y_train)
# model.fit(X, y)

# predict = model.predict(X_test)
# predict = model.predict(X)

# print("coefficient_determination: \n", r2_score(y_test, predict))
# print(r2_score(y, predict))
167 changes: 167 additions & 0 deletions src/thefittest/base/_gp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from __future__ import annotations

from abc import ABCMeta, abstractmethod
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Tuple
from typing import Type
from typing import Union

import numpy as np
from numpy.typing import ArrayLike
from numpy.typing import NDArray

from sklearn.base import BaseEstimator
from sklearn.base import ClassifierMixin
from ..base._tree import init_symbolic_regression_uniset
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils.multiclass import check_classification_targets
from sklearn.utils.validation import check_array
from sklearn.utils.validation import check_is_fitted


from ..optimizers import GeneticProgramming

from ..base import UniversalSet
from ..optimizers import SelfCGP
from ..utils._metrics import categorical_crossentropy3d
from ..utils._metrics import root_mean_square_error2d
from ..utils._metrics import coefficient_determination
from ..utils.random import check_random_state
from ..utils.random import randint
from ..utils.random import uniform
from ..utils import array_like_to_numpy_X_y


def fitness_function(trees: NDArray, y: NDArray[np.float64]) -> NDArray[np.float64]:
fitness = []
for tree in trees:
y_pred = tree() * np.ones(len(y))
fitness.append(coefficient_determination(y, y_pred))
return np.array(fitness, dtype=np.float64)


class BaseGP(BaseEstimator, metaclass=ABCMeta):

@abstractmethod
def __init__(
self,
*,
n_iter: int = 50,
pop_size: int = 500,
uniset: Optional[UniversalSet] = None,
optimizer: Union[Type[SelfCGP], Type[GeneticProgramming]] = SelfCGP,
optimizer_args: Optional[dict[str, Any]] = None,
random_state: Optional[Union[int, np.random.RandomState]] = None,
):
self.n_iter = n_iter
self.pop_size = pop_size
self.uniset = uniset
self.optimizer = optimizer
self.optimizer_args = optimizer_args
self.random_state = random_state

def get_optimizer(
self,
) -> Union[
GeneticProgramming,
SelfCGP,
]:
return self._optimizer

def generator1(self) -> float:
value = np.round(uniform(0, 10, 1)[0], 4)
return value

def generator2(self) -> int:
value = randint(0, 10, 1)[0]
return value

def check_optimizer_args(self) -> dict:
if self.optimizer_args is None:
optimizer_args = {}
else:
optimizer_args = self.optimizer_args.copy()
for arg in (
"iters",
"uniset",
"pop_size",
):
assert (
arg not in optimizer_args
), f"Do not set '{arg}' in 'optimizer_args'. Instead, use the arguments of the class."
for arg in (
"fitness_function",
"fitness_function_args",
"genotype_to_phenotype",
"genotype_to_phenotype_args",
"minimization",
"init_population",
"optimal_value",
):
assert (
arg not in optimizer_args
), f"Do not set '{arg}' to 'optimizer_args'. It is defined automatically."

return optimizer_args

def fit(self, X: ArrayLike, y: ArrayLike):

optimizer_args = self.check_optimizer_args()
check_random_state(self.random_state)

if isinstance(self, ClassifierMixin):
pass
else:
X, y = self._validate_data(X, y, y_numeric=True, reset=True)

X, y = array_like_to_numpy_X_y(X, y)

# в отдельную функцию
if self.uniset is None:
uniset = init_symbolic_regression_uniset(
X, ephemeral_node_generators=(self.generator1, self.generator2)
)
else:
uniset = self.uniset

optimizer_args["iters"] = self.n_iter
optimizer_args["pop_size"] = self.pop_size
optimizer_args["uniset"] = uniset

if isinstance(self, ClassifierMixin):
pass

else:
optimizer_args["fitness_function"] = fitness_function
optimizer_args["fitness_function_args"] = {"y": y}

self.trained_optimizer_ = self.optimizer(**optimizer_args)
self.trained_optimizer_.fit()

self.tree_ = self.trained_optimizer_.get_fittest()["phenotype"]

return self

def predict(self, X: NDArray[np.float64]):

check_is_fitted(self)

X = check_array(X)
n_features = X.shape[1]

if self.n_features_in_ != n_features:
raise ValueError(
"Number of features of the model must match the "
f"input. Model n_features is {self.n_features_in_} and input "
f"n_features is {n_features}."
)

tree_for_predict = self.tree_.set_terminals(**{f"x{i}": X[:, i] for i in range(n_features)})

y_predict = tree_for_predict() * np.ones(len(X))

return y_predict
40 changes: 2 additions & 38 deletions src/thefittest/base/_gpnn.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,35 +39,7 @@
from ..utils._metrics import categorical_crossentropy3d
from ..utils._metrics import root_mean_square_error2d
from ..utils.random import check_random_state


class Model:
def _fit(
self,
X: np.typing.NDArray[np.float64],
y: NDArray[Union[np.float64, np.int64]],
) -> Any:
pass

def _predict(self, X: NDArray[np.float64]) -> Any:
pass

def get_optimizer(
self: Model,
) -> Any:
pass

def fit(
self,
X: NDArray[np.float64],
y: NDArray[Union[np.float64, np.int64]],
) -> Any:
assert np.all(np.isfinite(X))
assert np.all(np.isfinite(y))
return self._fit(X, y)

def predict(self, X: NDArray[np.float64]) -> NDArray[Union[np.float64, np.int64]]:
return self._predict(X)
from ..utils import array_like_to_numpy_X_y


def fitness_function_structure(
Expand Down Expand Up @@ -271,13 +243,6 @@ def __init__(
self.net_size_penalty = net_size_penalty
self.random_state = random_state

def array_like_to_numpy_X_y(
self, X: ArrayLike, y: ArrayLike
) -> Tuple[NDArray[np.float64], NDArray[np.int64]]:
X = np.array(X, dtype=np.float64)
y = np.array(y, dtype=np.float64)
return X, y

def get_net(self) -> Net:
return self.net_

Expand Down Expand Up @@ -382,7 +347,7 @@ def fit(self, X: ArrayLike, y: ArrayLike):

y = self._target_scaler.fit_transform(y.reshape(-1, 1))[:, 0]

X, y = self.array_like_to_numpy_X_y(X, y)
X, y = array_like_to_numpy_X_y(X, y)

if self.offset:
X = np.hstack([X, np.ones((X.shape[0], 1))])
Expand Down Expand Up @@ -428,7 +393,6 @@ def predict(self, X: NDArray[np.float64]):
check_is_fitted(self)

X = check_array(X)
self._validate_data
n_features = X.shape[1]

if self.n_features_in_ != n_features:
Expand Down
Loading

0 comments on commit a46128a

Please sign in to comment.