In [238]:
import numpy as np
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.linear_model import Ridge, LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

In [239]:
# Сгенерируем набор данных
n_objects = 100000
n_features = 500
np.random.seed(42)
sgd_steps = 100000
gd_steps = 1000

weights_true = np.random.normal(size=n_features)
X = np.random.uniform(-50, 50, (n_objects, n_features))
Y = np.dot(X, weights_true) + np.random.normal(0, 1, n_objects)

## Ridge с градиентным спуском 
Работает на уровне с моделью из scikitlearn (отличие на уровне 5 знака после запятой)

In [240]:
class RidgeReg(BaseEstimator, RegressorMixin):
    def __init__(self, n_steps, learning_rate=0.01, lambda_parameter=1):
        self.n_steps = n_steps
        self.learning_rate = learning_rate
        self.lambda_parameter = lambda_parameter
        
    def fit(self, X, Y):
        np.random.seed(42)
        weights = np.random.uniform(-1, 1, X.shape[1])
        for step in range(self.n_steps):
            learning_rate = self.learning_rate
            weights -= -2 * learning_rate * ((X.T.dot(Y - np.dot(X, weights)) / Y.size) - 
                                             (self.lambda_parameter * weights / weights.size))
        self.weights = weights
        return self
    
    def predict(self, X):
        return np.dot(X, self.weights)

In [241]:
scaler = StandardScaler()
scaler.fit(X)
X_scaled = scaler.transform(X)
X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y)

In [242]:
Ridge_my = RidgeReg(n_steps=gd_steps).fit(X_train, Y_train)
prediction = Ridge_my.predict(X_test)
r2 = r2_score(Y_test, prediction)
sk_Ridge = Ridge().fit(X=X_train, y=Y_train)
sk_prediction = sk_Ridge.predict(X_test)
sk_r2 = r2_score(Y_test, sk_prediction)

print("R^2 моей модели:", r2)
print("R^2 модели из sklearn:", sk_r2)

R^2 моей модели: 0.9999837497180591
R^2 модели из sklearn: 0.9999975142503074


## Модель со стохастическим градиентным спуском
Работает хуже модели из scikitlearn (отличие на уровне сотых долей), зато работает быстрее модели с классическим градиентным спуском.

In [243]:
class RidgeRegSGD(BaseEstimator, RegressorMixin):
    def __init__(self, n_steps, batch_size=20, learning_rate=0.1, lambda_parameter=1):
        self.n_steps = n_steps
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.lambda_parameter = lambda_parameter
        
    def fit(self, X, Y):
        np.random.seed(42)
        weights = np.random.uniform(-1, 1, X.shape[1])
        for step in range(self.n_steps):
            learning_rate = self.learning_rate
            learning_rate *= 1 / (step + 1)
            n_objects = X.shape[0]
            ind = np.random.randint(0, n_objects, self.batch_size)
            weights -= -2 * learning_rate * ((X[ind].T.dot(Y[ind] - np.dot(X[ind], weights)) / Y[ind].size) - 
                                             (self.lambda_parameter * weights / weights.size))
        self.weights = weights
        return self
    
    def predict(self, X):
        return np.dot(X, self.weights)

In [244]:
scaler = StandardScaler()
scaler.fit(X)
X_scaled = scaler.transform(X)
X_train, X_test, Y_train, Y_test = train_test_split(X_scaled, Y)

In [245]:
Ridge_my = RidgeRegSGD(n_steps=sgd_steps).fit(X_train, Y_train)
prediction = Ridge_my.predict(X_test)
r2 = r2_score(Y_test, prediction)

sk_Ridge = Ridge().fit(X=X_train, y=Y_train)
sk_prediction = sk_Ridge.predict(X_test)
sk_r2 = r2_score(Y_test, sk_prediction)
print("R^2 моей модели:", r2)
print("R^2 модели из sklearn:", sk_r2)

R^2 моей модели: 0.9680646428370597
R^2 модели из sklearn: 0.9999974837265182


## Логистическая регрессия 

In [246]:
class LogReg(BaseEstimator, RegressorMixin):
    def __init__(self, n_steps, learning_rate=0.01):
        self.n_steps = n_steps
        self.learning_rate = learning_rate
    
    @staticmethod
    def sigmoid(vec):
        return 1 / (1 + np.exp(-vec))
        
    def fit(self, X, Y):
        np.random.seed(42)
        weights = np.random.uniform(-1, 1, X.shape[1])
        for step in range(self.n_steps):
            learning_rate = self.learning_rate
            weights -= learning_rate * X.T.dot((self.sigmoid(np.dot(X, weights)) - Y)) / Y.size
        self.weights = weights
        return self
    
    def predict(self, X):
        return np.array(self.sigmoid(np.dot(X, self.weights)) > 0.5, dtype=np.int)

In [250]:
#Загрузим данные из датасета Ирис
X, y = load_iris(return_X_y=True)
y = y[y != 2]
X = X[:100]
scaler = StandardScaler()
scaler.fit(X)
X_scaled = scaler.transform(X)

In [248]:
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y)

In [249]:
logreg = LogReg(n_steps=1000).fit(X_train, y_train)
prediction = logreg.predict(X_test)
accuracy_score(prediction, y_test)
sk_logreg = LogisticRegression().fit(X_train, y_train)
sk_prediction = sk_logreg.predict(X_test)
accuracy_score(sk_prediction, y_test)
print("accuracy score моей модели:", accuracy_score(prediction, y_test))
print("accuracy score модели из sklearn:", accuracy_score(sk_prediction, y_test))

accuracy score моей модели: 1.0
accuracy score модели из sklearn: 1.0
