# Градиентный спуск для одной переменной и свободного члена

## Вступление<br>
Это второй ноутбук в серии, где я буду с нуля создавать базовые алгоритмы машинного обучения с объяснениями и дополнительными материлами.<br>
Так я сам разберусь в мелочах, а впоследствии, может помогу еще кому-нибудь<br>В ноутбуке [basic_gradient_descent.ipynb](https://github.com/konstantin-suspitsyn/algorithms-from-scratch/blob/main/basic_gradient_descent.ipynb) 

## Теоретическая часть

Теория аналогична той, что была в файле basic_gradient_descent.ipynb
<br><br>
$MSE = \frac{1}{n} \sum_{i=1}^n (y_i - \hat{y_i})^2 $<br>
Частные производные по $a_i$ равны $ \frac{\delta f}{\delta a} = \frac{1}{n} \sum_{i=1}^n-2*(y-(a_i^0*x_i^0+a_i^1*x_i^1+...+a_i^z*x_i^z))*x_i $, где $x^0=1$, а $a^0$ — свободный член, а сверху не степень, а порчдковый номер атрибута <br><br>

In [None]:
import random

In [None]:
class GradientDescents:
  
  '''
  Имплементация градиентного спуска с нуля
  '''

  def progress_tracker(self, step: int, cost_function: float) -> None:
    '''
    Функция позволяет отслеживать онлайн прогресс

    :param step: текущий шаг
    :param cost_function: значение кост-функции в данный момент

    '''
    from IPython.display import clear_output
    clear_output(wait=True)
    print('Шаг: {}'.format(step))
    print('Функция потерь: {:.2f}'.format(cost_function))

  def mse_function(self, y_true: list, y_pred: list) -> float:
    '''
    Функция, которая считет MSE

    :param y_true: значения y, которые мы знаем из фактических данных
    :param y_pred: значения y, которые мы получили в данный момент

    :return mse: значеник MSE по формуле
    '''
    # Кол-во значений, которое мы сравнивам
    n = len(y_true)
    # Стартуем с нуля
    pre_mse = 0
    for index, value in enumerate(y_true):
      pre_mse += (value - y_pred[index])**2
    mse = pre_mse/n
    return mse
  
  def gradient_descent(self, X_true: list, y_true: list, \
                       start_a: float = 1.0, start_b: float = 1.0, \
                       learning_rate: float = 0.003, max_steps: int =30000, \
                       save_steps: int = 0) -> dict:
    '''
    Простейший градиентный спуск для формул вида y=a*x+b

    :param start_a: стартовое значение a
    :param start_b: стартовое значение b
    :param learning_rate: коэффициент обучения
    :param max_steps: максимальное кол-во шагов, при которых алгоритм остановится
    :param save_steps: если 0, сохранится только последний шаг
                       если значение отличное от нуля, 
                       будет сохраняться каждый i-ый шаг

    :return a: значение a
    :return b: значение b
    :return mse: значение MSE по формуле
    :return mse_list: значение списка MSE по формуле для графика
    :return a_list: значение a для графика
    :return b_list: значение b для графика
    '''
    # Инициализируем первый шаг
    step = 0
    a = start_a
    b = start_b
    mse = 9999999
    mse_prev = 0

    # Сделаем трекинг обучения
    mse_list = []
    a_list = []
    b_list = []

    # Список посчитанных y для кост-функции
    y_pred = []
    # количество элементов в предсказаниях/реальных данных
    n=len(y_true)

    # Создаем переменные градиентов
    grad_a=0
    grad_b=0

    # модель будет работать пока мы не пройдем все шаги, 
    # или разница по MSE не будет меньше 1e-10
    # или разница между MSE и MSE_prev меньше 1e-10
    while (step <= max_steps) and (mse >= 1e-10) \
           and (abs(mse - mse_prev) >= 1e-5):
      
      mse_prev = mse
      # Вычисляем сдвиги, как было описано в теории
      for i, x in enumerate(X_true):
        grad_a += -2*(y_true[i] - (a*x + b))* x
        grad_b += -2*(y_true[i] - (a*x + b))
      grad_a = grad_a/n
      grad_b = grad_b/n
      # Делаем сдвиг в сторону противоположную градиенту с учетом lr
      a -= learning_rate*grad_a
      b -= learning_rate*grad_b
      # Считаем новые предасказания
      y_pred = [a*x+b for x in X_true]
      # Проверяем MSE
      mse = self.mse_function(y_true, y_pred)

      step += 1

      # Заполняем данные для отслеживания прогресса
      if save_steps > 0:
        if step % save_steps == 0:
          mse_list.append(mse)
          a_list.append(a)
          b_list.append(b)
      
      self.progress_tracker(step-1, mse)

    if save_steps > 0:
      return_dict = {'a': a, 'b': b, 'mse':mse, 'steps': step-1, \
            'mse_list': mse_list, 'a_list': a_list, 'b_list': b_list}
    else:
      return_dict = {'a': a, 'b': b, 'mse':mse, 'steps': step-1}

    return return_dict

Создаем синтетические данные

In [None]:
# Веса
s_w = [5, 12, 4]

In [None]:
X_true = []

for i in range(100):
  x_element = []
  for j in range(3):
    x_element.append(random.random())
  X_true.append(x_element)

In [None]:
y_true = []
for xt in X_true:
  y_true.append(sum([w*x for w,x in zip(s_w,xt)]))

In [None]:
old_gb = GradientDescents()

In [None]:
# Создаем рандомный спиок весов данных равный количеству атрибутов
weights = [random.random() for f in X_true[0]]

learning_rate = 0.001
mse=999

n = len(X_true)
st = 0
while mse>1e-5:
  # Считаем градиенты
  gradients = []
  for wi, w_value in enumerate(weights):
    current_gradient=0
    for yi, y_t_val in enumerate(y_true):
      current_gradient += -2*(y_t_val - sum([w*x for w,x in zip(weights,X_true[yi])]))* X_true[yi][wi]
    current_gradient = current_gradient/n
    gradients.append(current_gradient)

  # Делаем сдвиг весов
  for gi, gr_value in enumerate(gradients):
    weights[gi] = weights[gi] - learning_rate*gr_value

  #Считаем y_pred
  y_pred = []
  for X_current in X_true:
    y_pred.append(sum([w*x for w,x in zip(weights,X_current)]))
  st +=1
  mse = old_gb.mse_function(y_true, y_pred)
  old_gb.progress_tracker(st, mse)

In [None]:
weights

[5.000588636309069, 11.997679793629745, 4.001396190588351]

In [None]:
s_w

[5, 12, 4]

Алгоритм отработал. Веса почти сошлись

Доработка класса

In [None]:
class GradientDescents:
  
  '''
  Имплементация градиентного спуска с нуля
  '''

  import random

  def progress_tracker(self, step: int, cost_function: float) -> None:
    '''
    Функция позволяет отслеживать онлайн прогресс

    :param step: текущий шаг
    :param cost_function: значение кост-функции в данный момент

    '''
    from IPython.display import clear_output
    clear_output(wait=True)
    print('Шаг: {}'.format(step))
    print('Функция потерь: {:.2f}'.format(cost_function))

  def mse_function(self, y_true: list, y_pred: list) -> float:
    '''
    Функция, которая считет MSE

    :param y_true: значения y, которые мы знаем из фактических данных
    :param y_pred: значения y, которые мы получили в данный момент

    :return mse: значеник MSE по формуле
    '''
    # Кол-во значений, которое мы сравнивам
    n = len(y_true)
    # Стартуем с нуля
    pre_mse = 0
    for index, value in enumerate(y_true):
      pre_mse += (value - y_pred[index])**2
    mse = pre_mse/n
    return mse
  
  def gradient_descent(self, X_true: list, y_true: list, \
                       start_a: float = 1.0, start_b: float = 1.0, \
                       learning_rate: float = 0.003, max_steps: int =30000, \
                       save_steps: int = 0) -> dict:
    '''
    Простейший градиентный спуск для формул вида y=a*x+b

    :param start_a: стартовое значение a
    :param start_b: стартовое значение b
    :param learning_rate: коэффициент обучения
    :param max_steps: максимальное кол-во шагов, при которых алгоритм остановится
    :param save_steps: если 0, сохранится только последний шаг
                       если значение отличное от нуля, 
                       будет сохраняться каждый i-ый шаг

    :return return_dict: {
                :return a: значение a
                :return b: значение b
                :return mse: значение MSE по формуле
                :return steps: кол-во прошедших шагов
                :return mse_list: значение списка MSE по формуле для графика, если save_steps > 0
                :return a_list: значение a для графика, если save_steps > 0
                :return b_list: значение b для графика, если save_steps > 0
                        }
    '''
    # Инициализируем первый шаг
    step = 0
    a = start_a
    b = start_b
    mse = 9999999
    mse_prev = 0

    # Сделаем трекинг обучения
    mse_list = []
    a_list = []
    b_list = []

    # Список посчитанных y для кост-функции
    y_pred = []
    # количество элементов в предсказаниях/реальных данных
    n=len(y_true)

    # Создаем переменные градиентов
    grad_a=0
    grad_b=0

    # модель будет работать пока мы не пройдем все шаги, 
    # или разница по MSE не будет меньше 1e-10
    # или разница между MSE и MSE_prev меньше 1e-10
    while (step <= max_steps) and (mse >= 1e-10) \
           and (abs(mse - mse_prev) >= 1e-5):
      
      mse_prev = mse
      # Вычисляем сдвиги, как было описано в теории
      for i, x in enumerate(X_true):
        grad_a += -2*(y_true[i] - (a*x + b))* x
        grad_b += -2*(y_true[i] - (a*x + b))
      grad_a = grad_a/n
      grad_b = grad_b/n
      # Делаем сдвиг в сторону противоположную градиенту с учетом lr
      a -= learning_rate*grad_a
      b -= learning_rate*grad_b
      # Считаем новые предасказания
      y_pred = [a*x+b for x in X_true]
      # Проверяем MSE
      mse = self.mse_function(y_true, y_pred)

      step += 1

      # Заполняем данные для отслеживания прогресса
      if save_steps > 0:
        if step % save_steps == 0:
          mse_list.append(mse)
          a_list.append(a)
          b_list.append(b)
      
      self.progress_tracker(step-1, mse)

    if save_steps > 0:
      return_dict = {'a': a, 'b': b, 'mse':mse, 'steps': step-1, \
            'mse_list': mse_list, 'a_list': a_list, 'b_list': b_list}
    else:
      return_dict = {'a': a, 'b': b, 'mse':mse, 'steps': step-1}

    return return_dict

  def gradient_descent_multi(self, X_true: list, y_true: list, \
                              weights: list = None, max_steps: int = 10000, \
                              learning_rate: float = 0.003, \
                              save_steps: int = 0) -> dict:
    '''
    Градиентный спуск для нескольких переменных

    :param X_true: фактические аттрибуты
    :param y_true: фактические результаты
    :param weights: стартовые веса, если не хотим начать обучение с рандома
    :param learning_rate: коэффициент обучения
    :param max_steps: максимальное кол-во шагов, при которых алгоритм остановится
    :param save_steps: если 0, сохранится только последний шаг
                        если значение отличное от нуля, 
                        будет сохраняться каждый i-ый шаг
    
    '''
    # Создаем рандомный спиок весов данных равный количеству атрибутов
    if weights == None:
      weights = [self.random.random() for f in X_true[0]]

    if save_steps > 0:
      mse_list = []
      weights_list = []
    
    # MSE предыдущего шага
    mse_prev = 0
    mse = 999

    # Кол-во экспериментов, которые у нас есть      
    n = len(X_true)

    step = 0
    while (step <= max_steps) and (abs(mse_prev-mse)>1e-5):
      # Считаем градиенты
      gradients = []
      for wi, w_value in enumerate(weights):
        current_gradient=0
        for yi, y_t_val in enumerate(y_true):
          current_gradient += -2*(y_t_val - sum([w*x for w,x in \
                                                 zip(weights,X_true[yi])]))* X_true[yi][wi]
        current_gradient = current_gradient/n
        gradients.append(current_gradient)

      # Делаем сдвиг весов
      for gi, gr_value in enumerate(gradients):
        weights[gi] = weights[gi] - learning_rate*gr_value

      #Считаем y_pred
      y_pred = []
      for X_current in X_true:
        y_pred.append(sum([w*x for w,x in zip(weights,X_current)]))
      
      step +=1
      mse_prev = mse
      mse = self.mse_function(y_true, y_pred)
      self.progress_tracker(step, mse)

      if save_steps > 0:
        if step % save_steps == 0:
          mse_list.append(mse)
          weights_list.append(weights)

    if save_steps > 0:
      return_dict = {'weights': weights, 'mse':mse, 'steps': step-1, \
                      'mse_list': mse_list, 'weights_list': weights_list}
    else:
      return_dict = {'weights': weights, 'mse':mse, 'steps': step-1}

    return return_dict



Опробуем что получилось на Boston Dataset из sklearn

In [None]:
from sklearn.datasets import load_boston
X, y = load_boston(return_X_y=True)
X_true = []
for i in X:
  x_s_list = [f for f in i]
  # x_s_list.append(1)
  X_true.append(x_s_list)
y_true = [f for f in y]
del X, y

In [None]:
grad_count = GradientDescents()

In [None]:
grad = grad_count.gradient_descent_multi(X_true, y_true, learning_rate=0.000003, max_steps=10000)

Шаг: 10001
Функция потерь: 49.24


На самом деле, функция gradient_descent является частным вариантом функции gradient_descent_multi, поэтому добавим ее внутрь gradient_descent_multi

In [None]:
class GradientDescents:
  
  '''
  Имплементация градиентного спуска с нуля
  '''

  import random

  def progress_tracker(self, step: int, cost_function: float) -> None:
    '''
    Функция позволяет отслеживать онлайн прогресс

    :param step: текущий шаг
    :param cost_function: значение кост-функции в данный момент

    '''
    from IPython.display import clear_output
    clear_output(wait=True)
    print('Шаг: {}'.format(step))
    print('Функция потерь: {:.2f}'.format(cost_function))

  def mse_function(self, y_true: list, y_pred: list) -> float:
    '''
    Функция, которая считет MSE

    :param y_true: значения y, которые мы знаем из фактических данных
    :param y_pred: значения y, которые мы получили в данный момент

    :return mse: значеник MSE по формуле
    '''
    # Кол-во значений, которое мы сравнивам
    n = len(y_true)
    # Стартуем с нуля
    pre_mse = 0
    for index, value in enumerate(y_true):
      pre_mse += (value - y_pred[index])**2
    mse = pre_mse/n
    return mse

  def gradient_descent_multi(self, X_true: list, y_true: list, \
                              weights: list = None, max_steps: int = 10000, \
                              learning_rate: float = 0.003, \
                              save_steps: int = 0) -> dict:
    '''
    Градиентный спуск для нескольких переменных

    :param X_true: фактические аттрибуты
    :param y_true: фактические результаты
    :param weights: стартовые веса, если не хотим начать обучение с рандома
    :param learning_rate: коэффициент обучения
    :param max_steps: максимальное кол-во шагов, при которых алгоритм остановится
    :param save_steps: если 0, сохранится только последний шаг
                        если значение отличное от нуля, 
                        будет сохраняться каждый i-ый шаг
    
    '''

    # Код для данных с 1 аттрибутом
    if (type(X_true[0])==int) or (type(X_true[0])==float):
      for i, x in enumerate(X_true):
        X_true[i]=[x,1]
    elif (type(X_true[0])==list) and (len(X_true[0])==1):
      for i, x in enumerate(X_true):
        X_true[i].append(1)

    # Создаем рандомный спиок весов данных равный количеству атрибутов
    if weights == None:
      weights = [self.random.random() for f in X_true[0]]

    if save_steps > 0:
      mse_list = []
      weights_list = []
    
    # MSE предыдущего шага
    mse_prev = 0
    mse = 999

    # Кол-во экспериментов, которые у нас есть      
    n = len(X_true)

    step = 0
    while (step <= max_steps) and (abs(mse_prev-mse)>1e-5):
      # Считаем градиенты
      gradients = []
      for wi, w_value in enumerate(weights):
        current_gradient=0
        for yi, y_t_val in enumerate(y_true):
          current_gradient += -2*(y_t_val - sum([w*x for w,x in \
                                                 zip(weights,X_true[yi])]))* X_true[yi][wi]
        current_gradient = current_gradient/n
        gradients.append(current_gradient)

      # Делаем сдвиг весов
      for gi, gr_value in enumerate(gradients):
        weights[gi] = weights[gi] - learning_rate*gr_value

      #Считаем y_pred
      y_pred = []
      for X_current in X_true:
        y_pred.append(sum([w*x for w,x in zip(weights,X_current)]))
      
      step +=1
      mse_prev = mse
      mse = self.mse_function(y_true, y_pred)
      self.progress_tracker(step, mse)

      if save_steps > 0:
        if step % save_steps == 0:
          mse_list.append(mse)
          weights_list.append(weights)

    if save_steps > 0:
      return_dict = {'weights': weights, 'mse':mse, 'steps': step-1, \
                      'mse_list': mse_list, 'weights_list': weights_list}
    else:
      return_dict = {'weights': weights, 'mse':mse, 'steps': step-1}

    return return_dict

In [None]:
# Сначала, заполним x. От -10 до 10 с шагом 0.1
# И по нему построим y через формулу 5*x + 6

# X_true - признаки, которые мы будем подавать в функцию, чтобы получить y_pred
X_true = []
y_true = []

# range работает только с целыми. Умножим все на 10, а при append, поделим на 10
for i in range(-100, 100, 1):
  X_true.append(i/10)
  y_true.append(5*i/10+6)

In [None]:
new_grad = GradientDescents()

In [None]:
gr = new_grad.gradient_descent_multi(X_true, y_true)

Шаг: 869
Функция потерь: 0.00


In [None]:
gr

{'mse': 0.0008162586030876157,
 'steps': 868,
 'weights': [4.999955819944541, 5.971428689818899]}