In [1]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, KFold
import seaborn as sns
from sklearn import datasets
from sklearn.base import BaseEstimator
#from sklearn.datasets import fetch_mldata, fetch_20newsgroups

from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import scipy
import scipy.stats as st



In [132]:
from sklearn.utils import shuffle
def batch_generator(X, y, shuffle_state =True, batch_size=1):
    
    n = X.shape[0]
    if shuffle_state:
        X, y = shuffle(X, y)
    for i in range(0, n, batch_size):
        try:
            yield (X[i:i + batch_size], y[i:i + batch_size])
        except IndexError:
            yield (X[i::], y[i::])

In [178]:
from sklearn.base import BaseEstimator, ClassifierMixin

class MyRidge(BaseEstimator, ClassifierMixin):
    
    def __init__(self, batch_generator, lambda_=0.1, alpha=0.01, max_epoch=1000, model_type='Linear', batch_size = 100):

        self.lambda_ = lambda_
        self.alpha = alpha
        self.max_epoch = max_epoch
        self.batch_generator = batch_generator
        self.errors_log = {'iter' : [], 'loss' : []}  
        self.model_type = model_type
        self.batch_size = batch_size
        
    
    def calc_loss(self, X_batch, Y_batch):
        m = X_batch.shape[0]
        L2 = self.lambda_/2 *(self.weights[1:] @ self.weights.reshape(-1,1)[1:])
        loss = ((Y_batch - X_batch @ self.weights.reshape(-1, 1))**2 + L2).sum()/m
        return loss
    
    
    
    def calc_loss_grad(self, X_batch, y_batch):

        m = X_batch.shape[0]
        predict = X_batch.dot(self.weights)
        loss_grad = (2 * (predict - y_batch).dot(X_batch) + \
                    self.lambda_ * np.hstack(([0],self.weights[1:]))) / m
        return loss_grad
    
    
    
    def update_weights(self, new_grad):
        self.weights -= new_grad * self.alpha 

    
    
    def fit(self, X, Y):
        
        self.weights = st.uniform.rvs(size = X.shape[1] + 1)
        for i in enumerate(range(self.max_epoch)):
            new_epoch_generator = self.batch_generator(X, Y, batch_size = self.batch_size)
            for batch in new_epoch_generator:
                X_batch = batch[0]
                w0vec = np.ones(X_batch.shape[0]).reshape(-1, 1)
                Y_batch = batch[1]
                X_batch = np.concatenate((w0vec, X_batch), axis = 1)
                loss_grad = self.calc_loss_grad(X_batch, Y_batch)
                self.update_weights(loss_grad)
                batch_loss = self.calc_loss(X_batch, Y_batch)
                if len(self.errors_log['loss']) >=2 and \
                np.absolute(self.errors_log['loss'][-1] - self.errors_log['loss'][-2]) < 0.001:
                    break
            self.errors_log['iter'].append(i[0])
            self.errors_log['loss'].append(batch_loss)
                
                
                
                

                
                
    
    def predict(self, X):
        w0vec = np.ones(X.shape[0]).reshape(-1, 1)
        X = np.concatenate((w0vec, X), axis = 1)
        return np.dot(X, self.weights.T)
       
    
        
        
        

In [179]:
from sklearn.datasets import make_regression
from sklearn.linear_model import Ridge

In [180]:
X, Y = make_regression(n_samples=100, n_features=2, n_informative = 2, n_targets=1, bias=0.6)

In [168]:
clf = MyRidge(batch_generator=batch_generator, alpha= 0.01, max_epoch=10000)

In [169]:
clf.fit(X, Y)

In [170]:
clf.errors_log['iter'][-1], len(clf.errors_log['loss'])

(9999, 10000)

In [171]:
clf1 = Ridge(alpha = 0.01)
clf1.fit(X, Y)
clf1.intercept_, clf1.coef_


(0.6009795767467843, array([80.36979905, 20.29236662]))

In [116]:
for i in enumerate(range(10)):
    print(i)

(0, 0)
(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
(6, 6)
(7, 7)
(8, 8)
(9, 9)
