In [26]:
# линейная регрессия, Lasso и Ridge регрессии и методы оптимизации
%%latex
$$ (1) y = \langle w, x \rangle$$
$$ (2) Y = Xw $$
$$ (3) w = (X^{T}X)^{-1}X^{T}Y $$
# (1) - запись линейной регрессии в виде скалярного произведения (с наличием байеса (то есть свободного)  члена w0, к которому мы добавили фиктивный признак, равный единице, чтобы удобно было представить в виде скалярного произведения). (2) запись лин рег в матричном виде, где X - матрица (вектор) признаков размера n на k, w - вектор весов размера k. (3) аналитическое решение для минимализации ошибки MSE для матричной формы

<IPython.core.display.Latex object>

In [41]:
# 1) АНАЛИТИЧЕСКОЕ РЕШЕНИЕ
import numpy as np
import pandas as pd
import sklearn as sk
from sklearn.linear_model import LinearRegression, Lasso, Ridge
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split


class MyLinearRegression:
    def __init__(self, fit_intercept = True):
        self.fit_intercept = fit_intercept # fit_intercept отвечает за наличие байеса 
    
    def fit(self, X, y):
        n, k = X.shape
        X_train = X
        if self.fit_intercept:
            X_train = np.hstack((X, np.ones((n, 1)))) # если есть байес, то добавляем фиктивную 1 
                                                      # для удобного вида в скалярном произведении
        
        self.w = np.linalg.inv(X_train.T @ X_train) @ X_train.T @ y
        
    def predict(self, X):
        n, k = X.shape
        
        if self.fit_intercept:
            X_train = np.hstack((X, np.ones((n, 1))))
            
        y_pred = X_train @ self.w
        return y_pred
    
    def get_weights(self):
        return self.w
    
def linear_expression(x):
    return 5 * x + 6


# недостатки аналитического решения: 1) посчитать inv обратную матрицу долго - куб операций
#                                    2) обратная матрица не всегда есть
# обратной матрицы может не быть, если мы имеем дело с вырожденной матрицей (в ней есть линейно 
# зависимые строки или столбцы). поэтому используется более серьезный метод - Градиентная оптимизация
# (градиентный спуск)


#стандартный (медленный вариант градиента)

class MyGradientLinearRegression(MyLinearRegression):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.w = None # итоговые веса
        
        
    def fit(self, X, y, shag = 0.01,  max_iter = 100):
        n, k = X.shape
        # случайно зададим веса
        if self.w is None:
            self.w = np.random.randn(k + 1 if self.fit_intercept else k)
        
        X_train = np.hstack((X, np.ones((n, 1)))) if self.fit_intercept else X
        
        self.losses = []
        
        for iter_num in range(max_iter):
            y_pred = self.predict(X)
            self.losses.append(mean_squared_error(y_pred, y))
            
            grad = self.calc_gradient(X_train, y)
            
            self.w -= shag * grad
            
        return self
    
    def calc_gradient(self, X, y):
        n, k = X.shape
        grad = (2 / n) * np.dot(X.T, (np.dot(X, self.w) - y))
        
        return grad
    
    def get_losses(self):
        return self.losses
    

# стохастический mini-batch. главное его отличие - это использование подвыборки для шага оптимизации
# (а не всей выборки, как в обычном спуске)

class StochasticMiniBatch(MyGradientLinearRegression):
    def __init__(self, n_sample = 10, **kwargs):
        super().__init__(**kwargs)
        self.w = None # итоговые веса
        self.n_sample = n_sample # кол-во элементов для подвыборки
    
    def calc_gradient(self, X, y):
        n, k = X.shape 
        inds = np.random.choice(np.arange(X.shape[0]), size = self.n_sample, replace = False)
        grad = (2 / n) * (np.dot(X.T, np.dot(X, self.w) - y))
        return grad

regressor = StochasticMiniBatch(fit_intercept=True)

l = regressor.fit(X_train[:, np.newaxis], y_train, max_iter=100).get_losses()

predictions = regressor.predict(X_test[:, np.newaxis])
w = regressor.get_weights()
w

array([4.83186993, 5.0042125 ])

In [27]:
%%latex #формула градиентного спуска в матричной форме:
$$\nabla Q(w) = \frac{2}{l}X^{T}(X_w - y)$$

<IPython.core.display.Latex object>

In [19]:
%%latex 
$$w^{t+1} = w^t - a_t\nabla Q(w^{t})$$ # правило обновления весов в градиентном спуске

<IPython.core.display.Latex object>

In [20]:
%%latex # ФОРМУЛА(2) # проверка для остановки в градиенте
$$w^{t} - w^{t-1} < e$$

<IPython.core.display.Latex object>

In [5]:
%%latex
$$Q(w) = \frac{1}{l}\sum_{i=1}^l(\langle w, x\rangle - y_i)$$

<IPython.core.display.Latex object>

In [None]:
# линейная регрессия в матричном (векторном виде)
# матричная форма намного быстрее обычного градиента
# всё, что меняется - это формула градиента в матричной форме
class LinearRegressionVectorized(BaseEstimator): 

    def __init__(self, epsilon=1e-4, w0 = None, max_steps = 1000, alpha=1e-2):
        self.epsilon = epsilon
        self.w0 = w0
        self.max_steps = max_steps
        self.alpha = alpha
        self.w = None
        self.w_history = []
    def fit(self, X, y):
        l, d = X.shape
        if self.w0 == None:
            self.w0 = np.zeros(d)
        self.w = w0
        for step in range(self.max_steps):
            self.w_history.append(self.w)
            w_new = self.w - self.alpha * self.calc_gradient(X, y)
            if np.linalg.norm(self.w_new - self.w) < epsilon:
                break
        self.w = self.w_new
        return self
    
    def calc_gradient_vectorized(self, X, y):
        l, d = X.shape
        return (2/l)*np.dot(X.T, (np.dot(X, self.w) - y))
        
    def predict(self, X):
        if self.w == None:
            raise Exception 'Модель не обучена'
        return np.dot(X, self.w)

In [None]:
## далее про линейный классификатор (обучение с градиентным спуском)
# функция потерь линейного классификатора - формула(1)
# градиент функции потерь линейного классификатора - формула(2)


In [17]:
%%latex
$$(1) Q(w,x) = \frac{1}{l}\sum_{i=1}^llog(1 + exp(-y_i\langle w, x_i\rangle))$$
$$(2) \nabla Q(w,x) = -\frac{1}{l}\sum_{i = 1}^l\frac{y_ix_i}{1 + exp(y_i\langle w,x_i \rangle)}$$

<IPython.core.display.Latex object>

In [10]:
import numpy as np

def log_loss(w, X, y):
    Q = np.log(1 + np.exp(-y * (w @ X))).mean()
    return Q

def calc_grad(w, X, y):
    Q_grad = -((X * y.reshape(-1, 1)) / (1 + np.exp(y * (w @ X))).reshape(-1, 1)).mean(axis = 0)
    return Q_grad

w_init = np.array([1.0, 1.0, 1.0])
X_new = np.c_[np.ones(n), X]

def gradient_descent_classifier(X, y, w_init, n_steps, eta):
    w = w_init.copy()
    loss_array = [log_loss(w_init, X, y)]
    for step in range(n_steps):
        w_grad = calc_grad(w_init, X, y)
        w -= eta * w_grad
        loss = log_loss(w, X, y)
        loss_array.append(loss)
    return loss_array, w

w, loss_array = gradient_descent_classifier(X_new, y, w_init, n_steps=1000, eta=0.1)

# from sklearn.linear_model import SGDClassifier - готовая реазилация с градиетным спуском


## вероятность классификация 
# модель вместо класса -1 или 1 возвращает вероятность принадлежности к определенному классу 
# чтобы из такого предсказания в итоге получить класс, нужно задать некоторый порог для вероятности


def predict_probabilities(X, w):
    return 1 / (1 + exp(-X @ w))

y_pred = predict_probabilities(X_new, w)

def check(y_pred, t):
    y_pred_classes = y_pred.copy()
    y_pred_classes[y_pred_classes > t] = 1
    y_pred_classes[y_pred_classes <= t] = -1
    return y_pred_classes

t = 0.5 # порог
# в зависимости от порога будут меняться Precision и Recall. найти оптимальный порог можно по 
# PR - кривой (Precision-Recall Curve)

#from sklearn.metrics import precision_recall_curve


ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 500 is different from 3)

In [22]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import normalize
from sklearn.metrics import precision_score
from sklearn.metrics import roc_auc_score

data = pd.read_csv('heart.csv')

X_train, X_test, y_train, y_test = train_test_split(data.drop('target', axis = 1), data['target'], test_size=0.25, random_state=13)

clf = SGDClassifier(max_iter = 1000, learning_rate='constant', eta0 = 0.1, alpha = 0, loss = 'log', random_state = 13)
clf.fit(X_train, y_train)
clf.coef_ # смотрим полученные веса (без свободного кф-а)
clf.intercept_ # смотрим свободный коэффицент
np.linalg.norm(clf.coef_) # считаем L-2 норму 
ans = clf.predict(X_test)
score = precision_score(y_test, ans)
score

# теперь попробуем применить регуляризацию
# за регуляризацию (штраф ща большие веса, это препятствует переобучению модели)
# L1 - лассо регуляризация ; L2 - гребневая регуляризация
# в sklearn.linear_model.SGDClassifier - параметр регуляризации обозначается параметром alpha.
# За тип регуляризации (L1, L2 или обе сразу) отвечает параметр penalty.

clf_reg = SGDClassifier(max_iter=1000, random_state = 13, penalty = 'l1', alpha = 0.1, learning_rate = 'optimal')
clf_reg.fit(X_train, y_train)
ans_2 = clf_reg.predict(X_test)
score_2 = precision_score(y_test, ans_2)
np.linalg.norm(clf_reg.coef_)


res_2 = roc_auc_score(y_test, ans_2)
res_2

0.613107822410148