In [1]:
!git clone https://github.com/m-zayan/ml_utils.git

!pip install memory-profiler

fatal: destination path 'ml_utils' already exists and is not an empty directory.


In [2]:
import numpy as np
import pandas as pd


# Solving SVM, dual problem  - SMO algorithm

# lower_bound = 0.0
# upper_bound | C = 1.0

In [3]:
%load_ext memory_profiler

import sys

import gc

sys.path.insert(0, './ml_utils')

from ml_utils.plots import Animation2D
from ml_utils.dataset import Load
from ml_utils.requests_utils.data_request import Writer


url = 'https://datahub.io/machine-learning/mnist_784/r/mnist_784.csv'
fname = 'mnist.csv'

Writer.download_from_url(url, to_path=f'./{fname}', chunk_size=1024)


In [4]:
loader = Load(dtype='numpy')

random_state = 42

# SMO

In [5]:
def compute_w(x, y, alpha):

  return np.dot(x, (y * alpha))

In [6]:
def compute_b(x, y, w):
  
  return (np.dot(x, w) - y).sum(axis=0)

In [7]:
def compute_u(x_i, y_i, alpha_i, b):

  w = compute_w(x_i, y_i, alpha_i)
  z = np.dot(x_i, w) - b
  
  return np.sign(z)

In [8]:
def compute_error_i(x_i, y_i, alpha_i, b):

  u_i = compute_u(x_i, y_i, alpha_i, b)
  
  return u_i - y_i

In [9]:
def check_kkt_conditions(y_i, u_i, alpha_i, C):

  solved_i = (alpha_i == 0 and y_i * u_i >= 1) or \
             (0 < alpha_i < C and y_i * u_i == 1) or \
             (alpha_i == C and y_i * u_i <= 1)
  
  return solved_i

In [10]:
# alpha_j - bounds, L, H - respectively

def constraints_bounds(y_i, y_j, alpha_i, alpha_j, C):

  if y_i == y_j:

    return max(0, alpha_j + alpha_i - C), min(C, alpha_j + alpha_i)
  
  else:

    return max(0, alpha_j - alpha_i), min(C, C + alpha_j - alpha_i)

In [11]:
def second_derivative(k_i, k_j, k_ij):

  # eta
  return k_i + k_j - 2 * k_ij 

In [12]:
 #  alpha_j clip
 
def correct_bounds_j(alpha_j, L, H):

  if alpha_j >= H:
    
    return H
  
  elif L < alpha_j < H:

    return alpha_j

  else: # alpha_j <= L

    return L

In [13]:
def update_alpha_j(y_i, y_j, alpha_i, alpha_j, e_i, e_j, eta, L, H):

  new_alpha_j = alpha_j + y_j* (e_i - e_j) / eta
  new_alpha_j = correct_bounds_j(new_alpha_j, L, H)

  return new_alpha_j

def update_alpha_i(y_i, y_j, alpha_i, alpha_j, new_alpha_j):

  # dir = y_i * y_j
  new_alpha_i = alpha_i + y_i * y_j * (alpha_j - new_alpha_j) 

  return new_alpha_i

In [14]:
# in case of second_derivative(...) is not positive definite, ex. x_i = x_j

def objective_function(k_i, k_j, k_ij, y_i, y_j, 
                       alpha_i, alpha_j, e_i, e_j, 
                       b, L, H):

  s = y_i * y_j # dir

  f_i = y_i * (e_i + b) - alpha_i * k_i - s * alpha_j * k_ij
  f_j = y_j * (e_j + b) - s * alpha_i * k_ij - alpha_j * k_j

  l_i = alpha_i + s * (alpha_j - L)
  h_i = alpha_i + s * (alpha_j - H)

  def at_line_segment(bnd_i, bnd):

    return bnd_i * f_i + \
           bnd * f_j + \
           0.5 * (bnd_i * bnd_i) * k_i + \
           0.5 * (bnd * bnd) * k_j + \
           s * bnd * bnd_i * k_ij

  obj_l = at_line_segment(l_i, L)
  obj_h = at_line_segment(h_i, H)

  return obj_l, obj_h

In [15]:
def poly_kernel(x_i, x_j, p=2): # p --> degree
  
  return (x_i @ x_j.T) ** p

def linear_kernel(x_i, x_j):

  return x_i @ x_j.T

In [16]:
class SVC:

  def __init__(self, X, Y, kernel_type='linear', gamma='scale', C=1.0, tol=1e-5):
    
    self.X = X
    self.Y = Y
    self.kernel_type = kernel_type

    self.gamma = gamma
    self.C = C

    self.tol = tol
    
    self.m, self.n = X.shape
    
    self.x_var = X.var()

    self.alpha = np.zeros((self.m, ))
    self.b = 0.0

    self.eps = 1e-3 # epsilon

    self.error_cache = {}
    self.kernel_cache = {}

  def compute_kernel(self, i, j):
    
    l = i * self.m + j
    keys = list(self.kernel_cache.keys())
    
    size = (len(keys) * 4) / (1024 * 1024)

    if size >= 400: # MB

      for k in range(len(keys) // 2):

        self.kernel_cache.pop(keys[k])

    if l not in self.kernel_cache:

      if self.kernel_type == 'rbf':

        pass # not implemented

      elif self.kernel_type == 'poly':

        self.kernel_cache[l] = poly_kernel(self.X[i], self.X[j], 2)
      
      elif self.kernel_type == 'linear':

        self.kernel_cache[l] = linear_kernel(self.X[i], self.X[j])
      
      else:

        raise ValueError('kernel_type - not found')

    return self.kernel_cache[l]

  def get_none_bounds(self):

    indices = np.where((self.alpha != 0.0) & (self.alpha != self.C))[0]

    return indices

  def take_step(self, i ,j):

    alpha_i, alpha_j = self.alpha[i], self.alpha[j]
    y_i, y_j = self.Y[i], self.Y[j]
    
    e_i, e_j = self.error_cache[i], self.error_cache[j]

    L, H = constraints_bounds(y_i, y_j, alpha_i, alpha_j, self.C)

    if L == H:

      return 0

    k_i = self.compute_kernel(i, i)
    k_j = self.compute_kernel(j, j)
    k_ij = self.compute_kernel(i, j)

    eta = second_derivative(k_i, k_j, k_ij)

    if eta > 0:

      new_alpha_j = update_alpha_j(y_i, y_j, alpha_i, alpha_j, e_i, e_j, eta, L, H)
    
    else:

      obj_l, obj_h = objective_function(k_i, k_j, k_ij, y_i, y_j, 
                                        alpha_i, alpha_j, e_i, e_j, self.b, L, H)
      
      if obj_l < obj_h - self.eps:
        
        new_alpha_j = L
      
      elif obj_l > obj_h + self.eps:

        new_alpha_j = H
      
      else:

        new_alpha_j = alpha_j
    
    if abs(alpha_j - new_alpha_j) < self.eps * (alpha_j + new_alpha_j + self.eps):
      
      return 0

    new_alpha_i = update_alpha_i(y_i, y_j, alpha_i, alpha_j, new_alpha_j)

    # update threshold - b

    b1 = self.b + e_i + \
        y_i * (new_alpha_i - alpha_i) * k_i + \
        y_j * (new_alpha_j - alpha_j) * k_ij


    b2 = self.b + e_j + \
          y_i * (new_alpha_i - alpha_i) * k_ij + \
          y_j * (new_alpha_j - alpha_j) * k_j

    
    # if 1 and 2 - b1 = b2

    if new_alpha_i != 0 and new_alpha_i != self.C: # 1

      self.b = b1

    elif new_alpha_j != 0 and new_alpha_j != self.C: # 2

      self.b = b2

    else:

      self.b = (b1 + b2) / 2.0

    # update error cache, for i, j
    self.error_cache[i] = compute_error_i(k_i, y_i, new_alpha_i, self.b)
    self.error_cache[j] = compute_error_i(k_j, y_j, new_alpha_j, self.b)

    if self.kernel_type == 'linear':

      pass # store w - update
    
    # update lagrange multipliers
    self.alpha[i], self.alpha[j] = new_alpha_i, new_alpha_j
    
    return 1

  # find 2nd lagrange multiplier eligible for optimization
  def find_j(self, i):
    
    e_i = self.error_cache[i]
    
    if e_i > 0:
      
      choice = (1e7, -1)

      for j in range(0, self.m):
        
        if i == j:
          
          continue

        k_j = self.compute_kernel(j, j)
        err_j = compute_error_i(k_j, self.Y[j], self.alpha[j], self.b)

        choice = min(choice, (err_j, j))


      return choice[1]
    
    else:

      choice = (-1, -1)

      for j in range(0, self.m):
        
        if i == j:

          continue

        k_j = self.compute_kernel(j, j)
        err_j = compute_error_i(k_j, self.Y[j], self.alpha[j], self.b)

        choice = max(choice, (err_j, j))

      return choice[1]

  def fit_svc(self, max_iter=100, heuristic='random'):

    # solve - main loop
    if heuristic == 'random':

      for k in range(max_iter):
        
        for i in range(0, self.m):

          k_i = self.compute_kernel(i, i)
          u_i = compute_u(k_i, self.Y[i], self.alpha[i], self.b)

          # at least one violated example  
          solved_i = check_kkt_conditions(self.Y[i], u_i, self.alpha[i], self.C)

          if solved_i:

            continue

          j = np.random.randint(0, self.m)
          k_j = self.compute_kernel(j, j)

          while i == j:
          
            j = np.random.randint(0, self.m)

            k_j = self.compute_kernel(j, j)

        self.error_cache[i] = compute_error_i(k_i, self.Y[i], self.alpha[i], self.b)
        self.error_cache[j] = compute_error_i(k_j, self.Y[j], self.alpha[j], self.b)
        
        self.take_step(i, j)
    
    elif heuristic == 'smo':

      for k in range(max_iter):

        start = np.random.randint(0, self.m)
        non_bounds = self.get_none_bounds()
        
        for i in non_bounds:
          
          k_i = self.compute_kernel(i, i)
          u_i = compute_u(k_i, self.Y[i], self.alpha[i], self.b)
          
          # at least one violated example
          solved_i = check_kkt_conditions(self.Y[i], u_i, self.alpha[i], self.C)

          if solved_i:
            
            continue


          self.error_cache[i] = compute_error_i(k_i, self.Y[i], self.alpha[i], self.b)

          # find 2nd lagrange multiplier eligible for optimization
          j = self.find_j(i)

          k_j = self.compute_kernel(j, j)

          self.error_cache[j] = compute_error_i(k_j, self.Y[j], self.alpha[j], self.b)

          self.take_step(i, j)

        for i in range(start, self.m):
          
          k_i = self.compute_kernel(i, i)
          u_i = compute_u(k_i, self.Y[i], self.alpha[i], self.b)
          
          self.error_cache[i] = compute_error_i(k_i, self.Y[i], self.alpha[i], self.b)

          solved_i = check_kkt_conditions(self.Y[i], u_i, self.alpha[i], self.C)

          if not solved_i:
        
            j = self.find_j(i)

            k_j = self.compute_kernel(j, j)

            self.error_cache[j] = compute_error_i(k_j, self.Y[j], self.alpha[j], self.b)

            self.take_step(i, j)

          else:

            continue
    else:

      raise ValueError('heuristic - not found')

  def predict(self, x_test):
    
    w = compute_w(self.X.T, self.Y[:, None], self.alpha[:, None])
    
    u = x_test @ w - self.b
    u = np.sign(u)

    neg = np.where(u == -1)[0]
    pos = np.where(u == 1)[0]

    u[neg] = 0
    u[pos] = 1

    return u

# Test Model

In [17]:
def one_hot_encoding(y, classes):

  encoded = (y.reshape(-1, 1) == classes).astype('float32')

  return encoded

In [18]:
def load_iris(test_size=0.1):

  global n_classes, n, x_train, y_train, x_test, y_test, m_train, m_test
  
  data = loader.load_train_test(dname='iris', test_size=test_size, random_state=random_state)
  
  x_train, y_train = data['train']
  
  x_test, y_test = data['test']
  y_test = y_test.squeeze()
  
  m_train, m_test = len(x_train), len(x_test)
  n = x_train.shape[-1]

  classes = np.array([0.0, 1.0, 2.0])
  n_classes = 3

  y_train = one_hot_encoding(y_train, classes)
  y_test = one_hot_encoding(y_test, classes)
  
  print('=' * 50)

  print('dim(x_train) :', x_train.shape)
  print('dim(y_train) :', y_train.shape)

In [19]:
%memit

try:

  del classes, n, x_train, y_train, x_test, y_test, theta, b
  
except:
  
  pass
  
load_iris(test_size=0.1)

peak memory: 145.60 MiB, increment: 0.02 MiB
{'Name': 'IRIS', 'n_samples': 150, 'n_features': 4}
dim(x_train) : (135, 4)
dim(y_train) : (135, 3)


In [20]:
x, y0 = x_train, y_train[:, 0]

y0[y0 == 0] = -1

In [21]:
svc = SVC(x, y0, kernel_type='poly')

svc.fit_svc(max_iter=1000, heuristic='smo')

In [22]:
y_pred = svc.predict(x_test)

acc = (y_pred.squeeze() == y_test[:, 0]).sum() / len(x_test)

print('accuracy score :', acc)

accuracy score : 0.9333333333333333
