In [1]:
import numpy as np
from sklearn.datasets import load_iris
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [2]:
class LogisticRegression:
    """ Классификатор на основе логистической регрессии для двух категорий.
    
    Параметры
    ---------
    random_state : int
      Начальное состояние генератора случаных чисел для первичной инициализации весов
    """
    
    def __init__(self, random_state=None):
        self._random_state = random_state

        if self._random_state:
            self.__rand = np.random.RandomState(self._random_state)
        else:
            self.__rand = np.random
    
    
    def fit(self, X, y, eta=0.01, max_iter=1000, method='GD', cbatch=20, tol=1e-7, sigma=0.1):
        """ Подгонка данных
        
        Параметры
        ---------
        X : {array-like}
          Матрица обучающих данных
        y : array-like
          Список категорий
        tol : number
          Минимальная разница между текущей ошибкой и предыдущей для прерывания оптимизации. По умолчанию 1e-7
        sigma : number
          Коэфициент импульса. Используется в методе SGD-MOMENTUM, SGD-NESTEROV и SGD-RMSPROP
        eta : float
          Скорость обучения модели. По умолчанию 0.01
        method : str
          Метод оптимизации. По умолчанию GD.
          Может принимать значения:
            GD - градиентный спуск
            SGD - стохастический градиентный спуск
            SGD-MOMENTUM - стохастический градиентный спуск по стандартной формуле моментов
            SGD-NESTEROV - стохастический градиентный спуск по формуле моментов Нестерова
            SGD-RMSPROP - стохастический градиентный спуск по формуле RMSPROP
        cbatch : int
          Количество элементов в каждом батче градиентного спуска. Используется только для методов SGD. По умолчанию 20.
        max_iter: int
          Количество эпох оптимизации. По умолчанию 1000.
        """
        
        method = method.upper()
        
        method_list = ('GD', 'SGD', 'SGD-MOMENTUM', 'SGD-NESTEROV', 'SGD-RMSPROP')
        assert method in method_list, f"method должен принимать значения {', '.join(method_list)}"
        assert 0<=tol<=1, "Значение tol должно быть от 0 до 1"
        assert 0<=sigma<=1, "Значение sigma должно быть от 0 до 1"
        assert 0<=cbatch, "Значение cbatch должно быть больше или равно 0"
        assert 0<=max_iter, "Значение max_iter должно быть больше или равно 0"
        assert 0<=eta, "Значение eta должно быть больше или равно 0"
        assert len(set(y))==2, "Количество категорий в y должно быть две"
            
        # Добавляем x0=1 в каждый обучающий образец
        X = np.c_[np.ones(X.shape[0]), X]
        
        # Инициализируем веса
        self._init_weights(X)
        
        # Переменная с ошибками каждой эпохи
        self.cost_ = []
        
        cost_prev = np.inf
        
        # Счетчик количества эпох обучения
        self.epoch_count_ = 0
        
        # Оптимизация методом градиентного спуска
        if method == 'GD':
            for _ in range(int(max_iter)):
                
                self.epoch_count_ += 1
                
                cost = self._update_weights_gd(X, y, eta=eta)
                
                self.cost_.append(cost)
                
                # Если абсолютное значение разницы текущей ошибки и предыдущей меньше tol, то завершаем обучение
                if np.abs(cost_prev - cost) < tol:
                    return
                
                # Текущая ошибка становится предыдущей перед переходом к следующей эпохе
                cost_prev = cost
                
        elif method in ('SGD', 'SGD-MOMENTUM', 'SGD-NESTEROV', 'SGD-RMSPROP'):
            
            # Инициализируем историю для алгоритмов с моментами и RMSPROP
            v_prev = np.zeros(X.shape[1])
            
            for _ in range(int(max_iter)):
                
                self.epoch_count_ += 1
                
                # Перемешиваем обучающие данные
                X, y = self._shuffle(X, y)
                
                # Получаем разделенные значение X и y с количеством елементов в блоке равным cbatch
                X_batches, y_batches = self._make_splitted_batches(X=X, y=y, cbatch=cbatch)
                
                # Переменная для накопления ошибок при расчете каждого батча
                epoch_cost = 0 
                
                # Для каждого батча производим оптимизацию и расчет ошибки в зависимости от типа SGD
                for X_batch, y_batch in zip(X_batches, y_batches):
                    if method == 'SGD':
                        epoch_cost += self._update_weights_gd(X_batch, y_batch, eta=eta)
                    elif method == 'SGD-MOMENTUM':
                        cost, v_prev = self._update_weights_momentum(X_batch, y_batch, eta=eta, v_prev=v_prev, sigma=sigma)
                        epoch_cost += cost
                    elif method == 'SGD-NESTEROV':
                        cost, v_prev = self._update_weights_nesterov(X_batch, y_batch, eta=eta, v_prev=v_prev, sigma=sigma)
                        epoch_cost += cost
                    elif method == 'SGD-RMSPROP':
                        cost, v_prev = self._update_weights_rmsprop(X_batch, y_batch, eta=eta, v_prev=v_prev, sigma=sigma)
                        epoch_cost += cost
                
                # Считаем среднюю ошибку в батчах для эпохи
                cost = epoch_cost/len(X_batches)
                self.cost_.append(cost)
                
                # Если абсолютное значение разницы текущей ошибки и предыдущей меньше tol, то завершаем обучение
                if np.abs(cost_prev - cost) < tol:
                    return
                
                # Текущая ошибка становится предыдущей перед переходом к следующей эпохе
                cost_prev = cost 
    
    
    def _update_weights_gd(self, X, y, eta):
        """ Обновление весов методом градиентного спуска
        
        Параметры
        ---------
        X : {array-like}
          Матрица обучающих данных
        y : array-like
          Список категорий
        eta : float
          Скорость обучения модели.
          
        Результат
        ---------
        cost : float
          Размер ошибки для логистической регрессии
        """
        
        # Вычисляем функцию общего входа для каждого обучающего элемента
        net_input = self._net_input(X=X, w=self._w)
        
        # Вычисляем значение функции активации (сигмоида) для каждого обучающего элемента
        output = self._activation(net_input)
        
        # Обновляем веса по формуле градиента логистичесикй регрессии
        self._w += eta * X.T.dot(y-output)
        
        # Рассчитываем текущую ошибку
        cost = self._cost_calc(z=output, y=y)
        
        return cost  
    
    
    def _update_weights_rmsprop(self, X, y, eta, v_prev, sigma=0.9):
        """ Обновление весов методом RMSPROP
        
        Параметры
        ---------
        X : {array-like}
          Матрица обучающих данных
        y : array-like
          Список категорий
        eta : float
          Скорость обучения модели.
        sigma : number
          Коэфициент импульса формулы оптизизации rmsprop
          sigma * v_prev + (1 - sigma) * np.dot(gradient, gradient)
        v_prev : array-like
          Предыдущее значение формулы sigma * v_prev + (1 - sigma) * np.dot(gradient, gradient)
          
        Результат
        ---------
        cost, v_current : tuple
          Размер ошибки для логистической регрессии 
          и текущее значение формулы sigma * v_prev + (1 - sigma) * np.dot(gradient, gradient)
        """
        
        self._X = X
        eps = np.finfo(np.float64).eps
        
        # Вычисляем функцию общего входа для каждого обучающего элемента
        net_input = self._net_input(X=X, w=self._w)
        # Вычисляем значение функции активации (сигмоида) для каждого обучающего элемента
        output = self._activation(net_input)
        
        # Вычисляем градиент
        gradient = X.T.dot(y-output)
        self._gradient = gradient
        
        v_current = sigma * v_prev + (1 - sigma) * np.dot(gradient, gradient)
        # Обновляем веса по формуле RMSPROP
        self._w += (eta/np.sqrt(v_current + eps))*gradient
        
        # Рассчитываем текущую ошибку
        cost = self._cost_calc(z=output, y=y)
        
        return cost, v_current
    
    
    def _update_weights_momentum(self, X, y, eta, v_prev, sigma=0.9):
        """ Обновление весов методом градиентного спуска
        
        Параметры
        ---------
        X : {array-like}
          Матрица обучающих данных
        y : array-like
          Список категорий
        eta : float
          Скорость обучения модели.
        sigma : number
          Коэфициент импульса формулы оптизизации моментов
          sigma * v_prev + eta * X.T.dot(y-output)
        v_prev : array-like
          Предыдущее значение формулы sigma * v_prev + eta * X.T.dot(y-output)
          
        Результат
        ---------
        cost, v_current : tuple
          Размер ошибки для логистической регрессии и текущее значение формулы sigma * v_prev + eta * X.T.dot(y-output)
        """
        
        # Вычисляем функцию общего входа для каждого обучающего элемента
        net_input = self._net_input(X=X, w=self._w)
        # Вычисляем значение функции активации (сигмоида) для каждого обучающего элемента
        output = self._activation(net_input)
        
        v_current = sigma * v_prev + eta * X.T.dot(y-output)
        # Обновляем веса по формуле моментов
        self._w += v_current
        
        # Рассчитываем текущую ошибку
        cost = self._cost_calc(z=output, y=y)
        
        return cost, v_current
    
    
    def _update_weights_nesterov(self, X, y, eta, v_prev, sigma=0.9):
        """ Обновление весов методом градиентного спуска
        
        Параметры
        ---------
        X : {array-like}
          Матрица обучающих данных
        y : array-like
          Список категорий
        eta : float
          Скорость обучения модели.
        v_prev :  array-like
          Предыдущее значение формулы sigma*v_prev - eta * X.T.dot(y-output)
        sigma : number
          Коэффициент импульса формулы моментов Нестерова
          
        Результат
        ---------
        cost, v_current : tuple
          Размер ошибки для логистической регрессии и текущее вычисление sigma*v_prev - eta * X.T.dot(y-output)
        """
        
        # Вычисляем веса для места ближе к точке, где окажемся на след.шаге
        temp_w = self._w - sigma*v_prev
        
        # Вычисляем функцию общего входа для каждого обучающего элемента с модифицированными весами
        net_input = self._net_input(X=X, w=temp_w)
        # Вычисляем значение функции активации (сигмоида) для каждого обучающего элемента с модифицированными весами
        output = self._activation(net_input)
        
        v_current = sigma*v_prev - eta * X.T.dot(y-output)
        
        # Обновляем веса по формуле Нестерова
        self._w -= v_current
        
        # Для каждой эпохи расчитываем ошибку
        cost = self._cost_calc(z=output, y=y)
        
        return cost, v_current
    
    
    def _make_splitted_batches(self, X, y, cbatch):
        """ Разделение обучающих данных на батчи для градиентного спуска
        
        Параметры
        ---------
        X : {array-like}
          Матрица обучающих данных
        y : array-like
          Список категорий
        cbatch : int
          Количество элементов в каждом батче
          
        Результат
        ---------
        X_batches, y_batches: (list, list)
          Списки батчей для X и y
        """
        
        length_X = len(X)
        assert cbatch <= length_X, 'cbatch не должен превышать количества строк в X'

        # Блок разделения данных обучения на части. В каждом батче количество обучающих данных равно cbatch
        start_index = 0
        end_index = cbatch
        X_batches =[]
        y_batches = []
        for _ in range(length_X//cbatch):
            X_batches.append(X[start_index:end_index])
            y_batches.append(y[start_index:end_index])
            start_index += cbatch
            end_index += cbatch
        # Добавляем оставшиеся элементы, которые не были добавлены в цикле
        if len(X[start_index:]) > 0:
            X_batches.append(X[start_index:])
            y_batches.append(y[start_index:])
        
        return X_batches, y_batches
        

    def _cost_calc(self, z, y):
        """ Расчет ошибки для логистической регрессии
        
        Параметры
        ---------
        z : array-like or number
          Значение функции общего входа. Список или число.
        y : array-like
          Значение корректных категорий. Список или число.
          
        Результат
        ---------
        cost : number
          Ошибка логистической регрессии
        """
  
        return -np.dot(y, np.log(z)) - np.dot(1-y, np.log(1-z))
    
    
    def _init_weights(self, X):
        """ Инициализация весов
        """
        # Инициализируем веса близкими к 0 значениями
        self._w = self.__rand.normal(loc=0, scale=0.01, size=X.shape[1])
      
    
    def _shuffle(self, X, y):
        """ Перемешивание обучающих данных
        
        Параметры
        ---------
        X : {array-like}
          Матрица обучающих данных
        y : array-like
          Список категорий
          
        Результат
        ---------
        X, y: tuple
          Перемешанные X и соответствующие им значения y
        """
        r = self.__rand.permutation(len(X))
        return X[r], y[r]
      
        
    def _net_input(self, X, w=None):
        """ Вычисляет общий вход z
        
        Параметры
        ---------
        X : {array-like}
          Массив признаков
        w : array-like
          Веса. Если w = None, то будут взяты веса из self._w
          
        Результат
        ---------
        z : float
          Значение общего входа
        """
        if w is None:
            w = self._w
            
        return np.dot(X, w)
    
    
    def _activation(self, z):
        """ Выполняет сигмоидальную активацию
        
        Параметры
        ---------
        z : float
          значение общего входа
          
        Результат
        ---------
        f : float
          Значение сигмоидальной функции активации
        """
        # Ограничиваю сверху размером 36, так как при большем значении функция дает результат 1
        # и из-за этого невозможно расчитать ошибку, так как выражение np.log(1-1) выдает исключение
        return 1 / (1 + np.exp(-np.clip(z, -250, 36)))
    
    
    def predict(self, X):
        """ Предсказание категории
        
        Параметры
        ---------
        X : {array-like}
          Массив признаков
          
        Результат
        ---------
        predict : array-like
          Массив категорий для массива признаков
        """
        # Добавляем x0=1 в каждый обучающий образец
        X = np.c_[np.ones(X.shape[0]), X]
        
        return np.where(self._net_input(X) > 0.0, 1, 0) 

In [3]:
X, y = load_iris(return_X_y=True)
# Оставляем только признаки 0 и 1
X = X[:100]
y = y[:100]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
scaler = StandardScaler()
scaler.fit(X_train)
X_train_std = scaler.transform(X_train)
X_test_std = scaler.transform(X_test)

lr = LogisticRegression(random_state=42)

In [4]:
print("Метод градиентного спуска")
lr.fit(X_train, y_train, method='GD', tol=1e-3)
y_predict = lr.predict(X_test)
print('accuracy:', accuracy_score(y_test, y_predict))
print('Количество эпох:', lr.epoch_count_)

Метод градиентного спуска
accuracy: 1.0
Количество эпох: 184


In [5]:
print("Метод стохастического градиентного спуска")
lr.fit(X_train, y_train, method='SGD', cbatch=1, tol=1e-3)
y_predict = lr.predict(X_test)
print('accuracy:', accuracy_score(y_test, y_predict))
print('Количество эпох:', lr.epoch_count_)

Метод стохастического градиентного спуска
accuracy: 1.0
Количество эпох: 32


In [6]:
print("Метод стохастического градиентного спуска по стандартной формуле моментов")
lr.fit(X_train, y_train, method='SGD-MOMENTUM', cbatch=1, tol=1e-3, sigma=0.9)
y_predict = lr.predict(X_test)
print('accuracy:', accuracy_score(y_test, y_predict))
print('Количество эпох:', lr.epoch_count_)

Метод стохастического градиентного спуска по стандартной формуле моментов
accuracy: 1.0
Количество эпох: 10


In [7]:
print("Метод стохастического градиентного спуска по формуле моментов Нестерова")
lr.fit(X_train, y_train, method='SGD-NESTEROV', cbatch=1, tol=1e-3, sigma=0.9)
y_predict = lr.predict(X_test)
print('accuracy:', accuracy_score(y_test, y_predict))
print('Количество эпох:', lr.epoch_count_)

Метод стохастического градиентного спуска по формуле моментов Нестерова
accuracy: 1.0
Количество эпох: 11


In [8]:
print("Метод стохастического градиентного спуска по формуле RMSPROP")
lr.fit(X_train, y_train, method='SGD-RMSPROP', cbatch=1, tol=1e-3, sigma=0.9)
y_predict = lr.predict(X_test)
print('accuracy:', accuracy_score(y_test, y_predict))
print('Количество эпох:', lr.epoch_count_)

Метод стохастического градиентного спуска по формуле RMSPROP
accuracy: 1.0
Количество эпох: 32
