<a href="https://colab.research.google.com/github/nedokormysh/Stepik_algorithms_ml_course/blob/linear_regression/Linear_regression_clean.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# correct version

from typing import Union, Callable

class MyLineReg():
    """
    Линейная регрессия

    Parameters
    ----------
    n_iter : int, optional
        Количество шагов градиентного спуска, by default 100
    learning_rate : Union[float, Callable], optional
        Коэффициент скорости обучения градиентного спуска.
        Либо фиксированный шаг обучения, если на вход пришло значение float.
        Либо шаг обучения изменяется в соответствии с функцией расчёта, если на вход пришла lambda-функция, by default 0.1
    weights : np.ndarray, optional
        Веса модели, by default None
    metric : str, optional
        Метрика, вычислается отдельно от функциии потерь.
        Принимает одно из следующих значений: mae, mse, rmse, mape, r2, by default None
    reg : str, optional
        Вид регуляризации. Принимает одно из следующих значений: l1, l2, elasticnet, by default None
    l1_coef : float, optional
        Коэффициент L1 регуляризации. Принимает значения от 0.0 до 1.0, by default 0
    l2_coef : float, optional
        Коэффициент L2 регуляризации. Принимает значения от 0.0 до 1.0, by default 0
    sgd_sample : Union[int, float], optional
        Количество примеров, которое будет использоваться на каждой итерации обучения.
        Может принимать целые числа - тогда используем точное количество примеров, либо дробные от 0.0 до 1.0 - тогда выбираем процент примеров от общего количества, by default None
    random_state : int, optional
        Рандомное состояние, by default 42
    """
    def __init__(self,
                 n_iter: int = 100,
                 learning_rate: Union[float, Callable] = 0.1,
                 weights: np.ndarray = None,
                 metric: str = None,
                 reg: str = None,
                 l1_coef: float = 0.,
                 l2_coef: float = 0.,
                 sgd_sample: float = None,
                 random_state: int = 42) -> None:

        self.n_iter = n_iter
        self.learning_rate = learning_rate
        self.weights = weights
        self.metric = metric
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        random.seed(random_state)

    def __repr__(self):
        return f"MyLineReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}"

    def get_best_score(self):
        return self.score

    def get_coef(self):
        return self.weights[1:]

    def loss(self, y: pd.Series, y_pred: pd.Series) -> float:

        loss = ((y - y_pred)**2).mean()

        if self.reg == 'l1':
            loss += self.l1_coef * np.abs(self.weights).sum()
        elif self.reg == 'l2':
            loss += self.l2_coef * np.square(self.weights).sum()
        elif self.reg == 'elasticnet':
            loss += self.l1_coef * np.abs(self.weights).sum() + self.l2_coef * np.square(self.weights).sum()
        return loss

    def predict(self, X):
         X = pd.concat([pd.DataFrame([1] * X.shape[0], index=X.index), X], axis=1)
         return np.dot(X, self.weights)

    def get_batch(self, x_length):
        return random.sample(range(x_length), int(self.sgd_sample * x_length)) if isinstance(self.sgd_sample, float) else random.sample(range(x_length), self.sgd_sample)

    @staticmethod
    def get_metric_score(metric: str, y: pd.Series, y_pred: pd.Series) -> float:
        """
        Вычисление метрики.

        mae - средняя абсолютная ошибка
        mse - среднеквадратичная ошибка
        rmse - квадратный корень из среднеквадратичной ошибки
        r2 - коэффициент детерминации
        mape - средняя абсолютная ошибка в процента
        """

        if metric == 'mae':
            score = (y - y_pred).abs().mean()
        elif metric == 'mse':
            score = np.mean((y_pred - y)**2)
        elif metric == 'rmse':
            score = np.sqrt(np.mean((y_pred - y)**2))
        elif metric == 'mape':
            score = (100 / y.shape[0]) * np.sum(abs((y - y_pred) / y))
        elif metric == 'r2':
            score = 1 - ((np.sum((y - y_pred)**2)) / (np.sum((y - np.mean(y))**2)))
        return score

    def grad(self, y: pd.Series, y_pred: np.array, X: pd.DataFrame, batch_idxs: list) -> float:
        y = y.iloc[batch_idxs]
        y_pred = y_pred[batch_idxs]
        X = X.iloc[batch_idxs]

        if not self.reg:
           grad = (2 / y.shape[0]) * ((y_pred - y)).dot(X)
        elif self.reg == 'l1':
           grad = (2 / y.shape[0]) * ((y_pred - y)).dot(X) + self.l1_coef * np.sign(self.weights)
        elif self.reg == 'l2':
           grad = (2 / y.shape[0]) * ((y_pred - y)).dot(X) + self.l2_coef * 2 * (self.weights)
        elif self.reg == 'elasticnet':
           grad = (2 / y.shape[0]) * ((y_pred - y)).dot(X) + self.l1_coef * np.sign(self.weights) + self.l2_coef * 2 * (self.weights)

        return grad

    def fit(self, X: pd.DataFrame, y: pd.Series, verbose: int = False) -> None:
        """Обучение линейной регрессии

        Parameters
        ----------
        X : pd.DataFrame
            Все фичи
        y : pd.Series
            Целевая переменная
        verbose : int, optional
            Указывает через сколько итераций градиентного спуска будет выводиться лог
        """
        X = pd.concat([pd.DataFrame([1] * X.shape[0], index=X.index), X], axis=1)

        self.weights = np.ones(X.shape[1])

        for i in range(self.n_iter):
            sample_rows_idx = self.get_batch(X.shape[0]) if self.sgd_sample else range(X.shape[0])
            y_pred = np.dot(X, self.weights)
            err = self.loss(y, y_pred)
            nabla = self.grad(y, y_pred, X, sample_rows_idx)

            if isinstance(self.learning_rate, (int, float)):
                self.weights -= self.learning_rate * nabla
            else:
                self.weights -= self.learning_rate(i + 1) * nabla

            if self.metric:
               self.score = getattr(self, 'get_metric_score')(self.metric, y, np.dot(X, self.weights))

            if verbose and i % verbose == 0:
               if self.metric:
                   print(f'start | loss = {err} | {self.score} | learning_rate = {self.learning_rate}') if i == 0 else print(f'i = {i} | loss = {err} | {self.score} | learning_rate = {self.learning_rate}')
               else:
                   print(f'start | loss = {err} | learning_rate = {self.learning_rate}') if i == 0 else print(f'i = {i} | loss = {err} | learning_rate = {self.learning_rate}')