In [1]:
from sklearn.datasets import make_classification
import pandas as pd 
import numpy as np 
import random

In [2]:
X, y = make_classification(n_samples=400, n_features=4)
X = pd.DataFrame(X, columns=['f1', 'f2', 'f3', 'f4'])
y = pd.Series(y)

### Стохастический градиентный спуск

И последнее... научимся выполнять стохастический градиентный спуск. 
Для этого добавьте в класс MyLogReg два новых параметра:

    sgd_sample — кол-во образцов, которое будет использоваться на каждой итерации обучения. Может принимать либо целые числа, либо дробные от 0.0 до   1.0.
    По-умолчанию: None
    random_state — для воспроизводимости результата зафиксируем сид (об этом далее).
    По-умолчанию: 42

Внесем изменение в алгоритм обучения:

    В начале обучения фискируем сид (см. ниже).
    В начале каждого шага формируется новый мини-пакет на основе параметра sgd_sample:
    Если заданы целые числа, то из исходного датасета берется ровно столько примеров, сколько указано.
    Если задано дробное число, то рассматриваем его как долю от количества строк в исходном датасете (округленное до целого числа).
    Расчет градиента (и последующее изменение весов) делаем на основе мини-пакета.
    Все остальные параметры, если они заданы (например, регуляризация), также должны учитываться при обучении.
    Ошибку и метрику необходимо считать на всем датасете, а не на мини-пакете.
    Если sgd_sample = None, то обучение выполняется как раньше (на всех данных).

**Случайная генерация**

Случайные подвыборки будем генерировать, как и раньше.

В начале обучения посредством модуля random фиксируем сид:

   `random.seed(<random_state>)`


В начале каждой итерации сформируем номера строк, которые стоит отобрать.

`sample_rows_idx = random.sample(range(X.shape[0]), <sgd_sample>)`




In [4]:
y

0      1
1      0
2      0
3      1
4      0
      ..
395    0
396    1
397    0
398    0
399    0
Length: 400, dtype: int32

In [5]:
def get_sgd_sample(n_row, n):
    if type(n) == int:
        return n 
    elif type(n) == float:
        return n_row * n

In [6]:
get_sgd_sample(len(y), 0.5)

200.0

In [7]:
len(y)

400

In [8]:
2

2

In [9]:
'''
            pred = X_copy.dot(self.__weights)                   # делаем предсказание модели
            proba = 1 / (1 + np.exp(-pred))                     # переводим предсказания в вероятности через функцию сигмоиды
            grad = (1/len(y))*(proba - y).dot(X_copy)           # вычисляем градиент LogLoss
            grad = self.__apply_regularization(grad, self.__weights)   # добавляем регуляризацию
            #self.__weights -= self.learning_rate * grad        # делаем шаг обучения
            if self.metric:                                    # если задана метрика 
                if i == self.n_iter-1:                         # считаем его на n - 1 итерации, чтобы последнне значение записать в best_score
                    marks = (proba>0.5).astype(int)
                    self.__best_score = self.__calculat_metric(y_true=y, y_pred=marks if self.metric!='roc_auc' else proba)

            if verbose and (i % 10 == 0 or i == self.n_iter-1):
                self.__log_training_step(i, y, proba, self.__weights)
                
            lr = self.__get_learning_rate(i+1)   # проверяем на динамичность lr

            self.__weights -= lr * grad          # делаем шаг обучения
''';

In [11]:
random.seed(31)
for i in range(5):
    print(random.sample(range(9), 3))

[0, 7, 8]
[6, 2, 5]
[0, 2, 8]
[8, 3, 5]
[2, 8, 5]


In [25]:
class MyLogReg():
    def __init__(self, n_iter=10, learning_rate=0.1, reg=None, l1_coef=0, l2_coef=0, random_state=42, sgd_sample=None, metric=None, weights=None):
        self.n_iter = n_iter 
        self.learning_rate = learning_rate
        self.__weights = weights
        self.metric = metric
        self.reg = reg
        self.l1_coef = l1_coef
        self.l2_coef = l2_coef
        self.sgd_sample = sgd_sample
        self.random_state = random_state
        self.__best_score = None

        self.__validate_params()
        
    def __repr__(self):
        return f'MyLogReg class: n_iter={self.n_iter}, learning_rate={self.learning_rate}'

    def __validate_params(self):
        """Проверяет корректность параметров модели."""
        if self.metric is not None and self.metric not in ['accuracy', 'precision', 'recall', 'f1', 'roc_auc']:
            raise ValueError(f"Invalid metric: {self.metric}, You can only use: 'accuracy', 'precision', 'recall', 'f1' or 'roc_auc' ")
        # Проверям на входные параметры регулиризации 
        if self.reg is not None and self.reg not in ['l1', 'l2', 'elasticnet']:
            raise ValueError("reg must be 'l1', 'l2', or 'elasticnet'")
        # Проверка коэффициентов (должны быть от 0 до 1)
        if self.l1_coef < 0 or self.l1_coef > 1:
            raise ValueError("l1_coef must be in [0, 1]")
        if self.l2_coef < 0 or self.l2_coef > 1:
            raise ValueError("l2_coef must be in [0, 1]")
    
    def __log_training_step(self, iteration: int, y: np.array, proba: np.array, w: np.array):
        """Логирует процесс обучения (итерацию, веса, функцию потерь).
    
        Параметры:
        iteration: int - номер итерации
        X: np.array - матрица признаков
        y: np.array - вектор истинных меток (0 или 1)
        proba: np.array - предсказанные вероятности класса 1
        """
        loss = self.__calculate_loss(y, proba, w)                           # считаем loss 
        metric_value = None                                                 # Инициализируем метрикику 
        if self.metric:                                                     # если она задана 
            y_pred = (proba>0.5).astype(int)                                # вероятности переводим в метки
            metric_value = self.__calculat_metric(y, y_pred if self.metric != 'roc_auc' else proba)       # cчитаем метрику
            
        if iteration == 0:
            if self.metric:
                print(f'start | loss: {loss:0.2f} | {self.metric}: {metric_value:0.2f}')
            else:
                print(f'start | loss: {loss:0.2f}')
        else:
            if self.metric:
                print(f'{iteration} | loss: {loss:0.2f} | {self.metric}: {metric_value:0.2f}')
            else:
                print(f'{iteration} | loss: {loss:0.2f}')
    def __calculate_loss(self, y: np.array, proba: np.array, w: np.array) -> float:
        """
        Вычисляем общий loss (MSLE + регуляризация).
        X: np.array (Матрица признаков)
        y: np.array (Вектор значений)
        w: np.array (Вектор весов)
        """
        eps = 1e-15
        loss = -np.mean(y*np.log(proba+eps) + (1-y)*(np.log(1-proba+eps)))

        if self.reg == 'l1':
            return loss + self.l1_coef * np.sum(np.abs(w))
        elif self.reg == 'l2':
            return loss + self.l2_coef * np.sum(w**2)
        elif self.reg == 'elasticnet':
            return loss + self.l1_coef * np.sum(np.abs(w)) + self.l2_coef * np.sum(w**2)
        else:
            return loss

    def __apply_regularization(self, grad: np.array, w: np.array):
        """Добавляет градиент регуляризации."""
        if self.reg == 'l1':
            return grad + self.l1_coef * np.sign(w)
        elif self.reg == 'l2':
            return grad + 2 * self.l2_coef * w
        elif self.reg == 'elasticnet':
            return grad + self.l1_coef * np.sign(w) + 2 * self.l2_coef * w
        else:
            return grad
    
    def __calculat_metric(self, y_true: np.array, y_pred: np.array):
        """Считает заданные метрики."""
        #  Напишем функцию для подсчета матрицы ошибок
        def confusion_matrix(y_true: np.array, y_pred: np.array):
            # инициализируем нашу матрицу ошибок
            tn = fn = fp = tp = 0 
            for true, pred in zip(y_true, y_pred):
                if pred == 0 and true == 0:          # TN
                    tn += 1 
                elif pred == 0 and true == 1:        # FN
                    fn += 1 
                elif pred == 1 and true == 0:        # FP
                    fp += 1 
                elif pred == 1 and true == 1:        # TP
                    tp += 1 
            return tn, fn, fp, tp
        
        if self.metric == 'roc_auc':
            # Предсказанные вероятности
            y_scores = y_pred
            # Сортируем по вероятностям
            sorted_indices = np.argsort(y_scores)
            sorted_y = np.array(y_true)[sorted_indices]

            # Присваиваем ранги (от 1 до n, как в scipy)
            n = len(y_scores)
            ranks = np.empty(n)
            i = 0
            while i < n:
                j = i
                while j + 1 < n and y_scores[sorted_indices[j]] == y_scores[sorted_indices[j + 1]]:
                    j += 1
                avg_rank = (i + j + 2) / 2  # т.к. ранги начинаются с 1
                for k in range(i, j + 1):
                    ranks[k] = avg_rank
                i = j + 1

            # Считаем сумму рангов положительного класса
            sum_ranks_pos = np.sum(ranks[sorted_y == 1])

            P = np.sum(sorted_y)         # количество положительных
            N = n - P                    # количество отрицательных

            if P == 0 or N == 0:
                return None
            # Вычисляем AUC по формуле Манна-Уитни
            auc = (sum_ranks_pos - P * (P + 1) / 2) / (P * N)
            return auc
        tn, fn, fp, tp = confusion_matrix(y_true, y_pred)

        if self.metric == 'accuracy':
            return (tp + tn) / (tp + tn + fp + fn)
        elif self.metric == 'precision':
            return tp / (tp + fp)
        elif self.metric == 'recall':
            return tp / (tp + fn)
        elif self.metric == 'f1':
            pr = tp / (tp + fp)      # precision
            re = tp / (tp + fn)      # recall
            return (2*pr*re) / (pr + re)
        
    def __get_learning_rate(self, iteration: int):
        """Динамический или статический lr."""
        if callable(self.learning_rate):
            return self.learning_rate(iteration)
        else:
            return self.learning_rate 

    def __get_sgd_sample(self ,n_row: int, sample):
        """Вернет размер батча для обучения SGD"""
        if type(sample) == int:
            return sample
        elif type(sample) == float:
            return max(1, int(n_row * sample))                                   # если задана слишком маленькая доля

    def __loss_and_metric_for_sgd(self, X:pd.DataFrame, y:pd.Series, w:np.array):
        """Метод для подсчета ласса и метрики для работы версии SGD"""
        ## В SGD версии нужно сичиать ошибку и метрику на всем датафрейме
        ## а обучаться только на мини батчах 
        pred = np.dot(X, w)                  # делаем предсказания 
        proba = 1/(1 + np.exp(-pred))        # переводим в вероятности
        marks = (proba>0.5).astype(int)      # переводим вероятности в метки класса, для подсчтета метрик 
        return pred, proba, marks

    def fit(self, X: pd.DataFrame, y: pd.Series, verbose=False):
        """
        Метод обучает модель Логистической регресси 
        Входные параметры:
        X: pd.DataFrame
        y: pd.Series
        verbose: bool
        """
        random.seed(self.random_state)                          # зафиксируем seed
        X_copy = X.copy()                                       # копируем матрицу признаков, чтобы не изменить оригинальный 
        X_copy.insert(0, 'base', 1)                             # добавляем столбик для свободного члена, заполним его 1
        self.__weights = np.ones(X_copy.shape[1])               # создаем вектор весов, заполненный 1

        # Цикл обучения
        for i in range(self.n_iter):
            if self.sgd_sample is not None:                                        # если у нас версия SGD, то для обучения формируем мини батчи 
                sgd_sample = self.__get_sgd_sample(y.shape[0], self.sgd_sample)
                sample_rows_idx = random.sample(range(X.shape[0]), sgd_sample)
                X_batch = X_copy.iloc[sample_rows_idx].to_numpy()
                y_batch = y.iloc[sample_rows_idx].to_numpy()
            else:                                                                  # иначи обучаемся на всех данных 
                X_batch = X_copy.to_numpy()
                y_batch = y.to_numpy()
            pred = X_batch.dot(self.__weights)                          # делаем предсказание модели
            proba = 1 / (1+np.exp(-pred))                               # переводим предсказания в вероятности через функцию сигмоиды
            grad = (1/len(y_batch))*(proba - y_batch).dot(X_batch)      # вычисляем градиенты функци LogLoss по весам
            grad = self.__apply_regularization(grad, self.__weights)    # добавляем регуляризацию, ели она установлена
            if self.metric:                                             # если задана метрика, то считаем его и обновляем best_score
                if i == self.n_iter-1:
                    pred, proba, marks = self.__loss_and_metric_for_sgd(X_copy, y, self.__weights)
                    self.__best_score = self.__calculat_metric(y_true=y, y_pred=marks if self.metric!='roc_auc' else proba)
            if verbose and (i % 10 == 0 or i == self.n_iter-1):         # логируем вывод
                _, proba, _ = self.__loss_and_metric_for_sgd(X_copy, y, self.__weights)
                self.__log_training_step(i, y, proba, self.__weights)
            lr = self.__get_learning_rate(i+1)                          # проверка на динамичность шага обучения
            self.__weights -= lr * grad                                 # спуск -> обновление веосв 
        
    def predict_proba(self, X: pd.DataFrame) -> np.array:
        """Вернет вероятностное предсказание модели"""
        X_copy = X.copy()
        X_copy.insert(0, 'base', 1)
        pred = X_copy.dot(self.__weights)
        proba = 1 / (1 + np.exp(-pred))
        return np.array(proba)

    def predict(self, X: pd.DataFrame) -> np.array:
        """Вернет маркерное предсказание модели"""
        proba = self.predict_proba(X)
        return np.array((proba > 0.5).astype(int))

    def get_best_score(self):
        if self.metric is None:
            raise ValueError("Metric was not set during model initialization.")
        return self.__best_score
                
    def get_coef(self) -> np.array:
        return np.array(self.__weights[1:])


In [27]:
np.array([[2,3, 4], [2, 1, 5]]).shape

(2, 3)

In [29]:
400*0.4

160.0

In [67]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

my_model = MyLogReg(n_iter=100, metric='accuracy')
sk_model = LogisticRegression()

In [69]:
my_model.fit(X, y, verbose=True)

start | loss: 0.94 | accuracy: 0.67
10 | loss: 0.62 | accuracy: 0.73
20 | loss: 0.41 | accuracy: 0.81
30 | loss: 0.30 | accuracy: 0.86
40 | loss: 0.24 | accuracy: 0.91
50 | loss: 0.20 | accuracy: 0.94
60 | loss: 0.18 | accuracy: 0.95
70 | loss: 0.17 | accuracy: 0.97
80 | loss: 0.16 | accuracy: 0.97
90 | loss: 0.16 | accuracy: 0.97
99 | loss: 0.15 | accuracy: 0.97


In [73]:
sk_model.fit(X, y)

In [79]:
print(f'Качество моей реализации: \n')
print(classification_report(y, my_model.predict(X)))

print(f'Качество версии из sklearn: \n')
print(classification_report(y, sk_model.predict(X)))

Качество моей реализации: 

              precision    recall  f1-score   support

           0       0.98      0.96      0.97       200
           1       0.96      0.98      0.97       200

    accuracy                           0.97       400
   macro avg       0.97      0.97      0.97       400
weighted avg       0.97      0.97      0.97       400

Качество версии из sklearn: 

              precision    recall  f1-score   support

           0       0.97      0.98      0.97       200
           1       0.98      0.96      0.97       200

    accuracy                           0.97       400
   macro avg       0.97      0.97      0.97       400
weighted avg       0.97      0.97      0.97       400

