In [1]:
from __future__ import annotations
from scipy.optimize import minimize

import os, sys
sys.path.insert(0, os.path.abspath('..'))

from typing import Literal, Callable

import numpy as np

from opml_math.functions import *
from MultiDimensionalOptimization.algorithms.nonlinear_cgm import nonlinear_cgm, HistoryMDO


class ModelCl:
    """
    Base model for classification
    """

    def __init__(self):
        pass

    def __call__(self, *args, **kwargs) -> float:
        """ Method to call model """
        return self.forward(*args, **kwargs)

    def forward(self, *args, **kwargs) -> float:
        pass

    def fit(self, *args, **kwargs) -> ModelCl:
        pass


class LogisticRegressionRBF(ModelCl):

    def __init__(self, x_basis: np.ndarray, rbf: Literal['linear', 'gaussian'] = 'linear',
                 print_function: Callable = print):
        """
        :param x_basis: centers of basis functions
        :param rbf: type of rbf function. Available: ['linear', 'gaussian']
        """
        super(LogisticRegressionRBF, self).__init__()
        self.w = np.random.random((x_basis.shape[0], 1))
        if rbf == 'linear':
            self.rbf = linear_rbf
        elif rbf == 'gaussian':
            self.rbf = gaussian_rbf
            
        self.x_basis = x_basis
        self.print = print_function
        
    def forward(self, x: np.ndarray, phi_matrix: np.ndarray = None) -> np.ndarray:
        """
        Returns a "probability" (confidence) of class 1
        
        :param x: 2D array
        :return: 1D array
        """
        if phi_matrix is None:
            phi_matrix = self.make_phi_matrix(x)
            
        return logistic_func(phi_matrix @ self.w)

    def fit(self, x: np.ndarray, y: np.ndarray, l1: bool = True, epochs: int = 1, batch_size: int=1,
            keep_history: bool = False, max_iter: int = 5) -> dict:
        """
        Minimize loss function

        :param x:
        :param y:
        :return: dict with history per epoch
        """
        def wrapper(w, x, y):
            self.w = w
            return self.loss_function(x, y)
        
        history = {'loss_function': []}
        for epoch in range(epochs):
            point, history_epoch = nonlinear_cgm(lambda w: wrapper(w, x, y), 
                                                 self.w.reshape(-1), max_iter=max_iter, keep_history=True)


            self.w = point['point'].reshape(-1, 1)
                
            history['loss_function'].extend(history_epoch['f_value'])
            self.print(f'epoch: {epoch: 4d}  |  loss: {history["loss_function"][-1] :0.5f}')
        return history
    
    def loss_function(self, x, y):
        """
        Cross-entropy loss
        """
        p = self.forward(x)
        loss = np.log(p[y == 1]).sum()
        loss += np.log(1 - p[y == 0]).sum()
        return -loss / len(x)
    
    def make_phi_matrix(self, x: np.ndarray) -> np.ndarray:
        """
        Returns k x n array with calculated phi(x_i, x_basis_j)
        
        :param x: Array k x m dimensional. k different x_i and m features
        """
        phi = np.zeros((x.shape[0], self.x_basis.shape[0]))
        for i in range(x.shape[0]):
            for j in range(self.x_basis.shape[0]):
                phi[i, j] = self.rbf(x[[i], :], self.x_basis[[j], :])
                
        return phi
    

class LogisticRegression(ModelCl):

    def __init__(self, x_basis: np.ndarray, rbf: Literal['linear', 'gaussian'] = 'linear',
                 print_function: Callable = print):
        """
        :param x_basis: centers of basis functions
        :param rbf: type of rbf function. Available: ['linear', 'gaussian']
        """
        super(LogisticRegressionRBF, self).__init__()
        self.w = np.random.random((x_basis.shape[0], 1))
        if rbf == 'linear':
            self.rbf = linear_rbf
        elif rbf == 'gaussian':
            self.rbf = gaussian_rbf_torch
            
        self.x_basis = x_basis
        self.print = print_function
        self.phi_matrix = self.make_phi_matrix(x_basis)
        
    def forward(self, x: np.ndarray) -> np.ndarray:
        """
        Returns a "probability" (confidence) of class 1
        
        :param x: 2D array
        :return: 1D array
        """
        return logistic_func(self.make_phi_matrix(x) @ self.w)
    
    def fit(self, x: np.ndarray, y: np.ndarray, l1: bool = True, epochs: int = 1, batch_size: int=1,
            keep_history: bool = False, max_iter: int = 5) -> dict:
        """
        Minimize loss function

        :param x:
        :param y:
        :return: dict with history per epoch
        """
        def wrapper(w, x, y):
            self.w = w
            return self.loss_function(x, y)
        
        history = {'loss_function': []}
        for epoch in range(epochs):
#             point, history_epoch = nonlinear_cgm(lambda w: wrapper(w, x, y), 
#                                                  self.w.reshape(-1), max_iter=max_iter, keep_history=True)


#             self.w = point['point'].reshape(-1, 1)
            point = minimize(lambda w: wrapper(w, x, y), self.w.reshape(-1))['x']
            
            history['loss_function'].append(wrapper(point, x, y))  # (history_epoch['f_value'])
            self.print(f'epoch: {epoch: 4d}  |  loss: {history["loss_function"][-1] :0.5f}')
        return history
    
    def loss_function(self, x, y):
        """
        Cross-entropy loss
        """
        p = self.forward(x, self.phi_matrix)
        loss = np.log(p[y == 1]).sum()
        loss += np.log(1 - p[y == 0]).sum()
        return -loss / len(x)
    
    def make_phi_matrix(self, x: np.ndarray) -> np.ndarray:
        """
        Returns k x n array with calculated phi(x_i, x_basis_j)
        
        :param x: Array k x m dimensional. k different x_i and m features
        """
        phi = np.zeros((x.shape[0], self.x_basis.shape[0]))
        for i in range(x.shape[0]):
            for j in range(self.x_basis.shape[0]):
                phi[i, j] = self.rbf(x[[i], :], self.x_basis[[j], :])
                
        return phi

In [2]:
from sklearn.datasets import make_classification

In [25]:
x, y = make_classification(n_samples=3, n_features=2, n_informative=1, n_redundant=0, n_clusters_per_class=1, random_state=1)

In [43]:
%%time
model = LogisticRegressionRBF(x, 'linear')
_ = model.fit(x, y, epochs=20, keep_history=True, max_iter=2)

epoch:    0  |  loss: 0.43228
epoch:    1  |  loss: 0.38926
epoch:    2  |  loss: 0.35316
epoch:    3  |  loss: 0.32251
epoch:    4  |  loss: 0.29622
epoch:    5  |  loss: 0.27347
epoch:    6  |  loss: 0.25364
epoch:    7  |  loss: 0.23622
epoch:    8  |  loss: 0.22083
epoch:    9  |  loss: 0.20715
epoch:   10  |  loss: 0.19493
epoch:   11  |  loss: 0.18396
epoch:   12  |  loss: 0.17407
epoch:   13  |  loss: 0.16512
epoch:   14  |  loss: 0.15698
epoch:   15  |  loss: 0.14955
epoch:   16  |  loss: 0.14275
epoch:   17  |  loss: 0.13651
epoch:   18  |  loss: 0.13076
epoch:   19  |  loss: 0.12545
Wall time: 429 ms


In [44]:
x

array([[-0.87369368,  1.74481176],
       [-1.33537055, -1.07296862],
       [ 0.9589962 , -0.7612069 ]])

In [46]:
model.make_phi_matrix(x)

array([[0., 1., 1.],
       [1., 0., 1.],
       [1., 1., 0.]])

In [48]:
torch.tile

array([[-0.87369368,  1.74481176],
       [-1.33537055, -1.07296862],
       [ 0.9589962 , -0.7612069 ]])

In [47]:
c0 = (x - x[0]) ** 2
c0.sum(axis=1)

array([0.        , 8.15303184, 9.63888173])

In [48]:
p = model(x)
p

array([[0.41021467],
       [0.57046976],
       [0.41713568],
       [0.46320333],
       [0.64994492],
       [0.50434193],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312]])

In [49]:
model.loss_function(x, y)

0.6667583005755058

In [50]:
model(x)

array([[0.41021467],
       [0.57046976],
       [0.41713568],
       [0.46320333],
       [0.64994492],
       [0.50434193],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312],
       [0.50359312]])