Функция потерь Хьюбера
$$
L_\delta(a,y)=
\begin{cases}
 \frac{1}{2}(y - a)^2,                   & |y - a| \le \delta, \\
 \delta\, |y - a| - \frac{1}{2}\delta^2 & \textrm{иначе.}
\end{cases}
$$
производная по вектору 

$$
\frac{\partial L}{\partial\omega}=\left\{\begin{array}{l}X^T(y\;-\;\omega X)\;,\;\;\;\;\;\left|y\;-\;\omega X\right|\leqslant\delta\\X^Tsign\lbrack y-\omega X\rbrack\end{array}\right.
$$

In [None]:
from sklearn.base import BaseEstimator
import numpy.linalg as la
import numpy as np

In [None]:
class HuberReg(BaseEstimator):
    def __init__(self, delta=1.0, gd_type='stochastic',
                 tolerance=1e-4, max_iter=1000, w0=None, alpha=1e-3,batch_size = 10):
        """
        gd_type: 'full' or 'stochastic'
        tolerance: for stopping gradient descent
        max_iter: maximum number of steps in gradient descent
        w0: np.array of shape (d) - init weights
        alpha: momentum coefficient
        """
        self.delta = delta
        self.gd_type = gd_type
        self.tolerance = tolerance
        self.max_iter = max_iter
        self.w0 = w0  
        self.alpha = alpha
        self.w = None
        self.batch_size = batch_size
        # list of loss function values at each training iteration
        self.loss_history = None 
    
    def calc_loss(self, X, y):
        if la.norm(y - np.dot(X,self.w)) <= self.delta:
            return 0.5 * la.norm(y -np.dot(X,self.w))
        else:
            return self.delta*la.norm((y - np.dot(X,self.w) - 0.5*self.delta) , ord = 1)
    
    def calc_gradient(self, X, y):
        step_size_0 = 0.045
        self.w = np.zeros(X.shape[1])
        if self.gd_type == 'full':
            w_mem = self.w.copy()
            h = np.zeros(X.shape[1])
            for i in range(self.max_iter):
                step_size = step_size_0 / ((i+1)**0.51)
                if (la.norm(y - np.dot(X,self.w))) <= self.delta:
                    grad = np.dot(X.T, (np.dot(X,self.w) - y)) / y.shape[0]
                    self.w -=h
                else:
                    grad =  self.delta *np.dot(X.T, np.sign(np.dot(X,self.w) - y))/ y.shape[0]
                    self.w -= h
                self.loss_history.append(self.calc_loss(X,y))
                h = self.alpha * h + step_size * grad
                if np.abs(la.norm(w_mem) - la.norm(self.w)) < self.tolerance and i != 0: # end
                    break
            return self.w
            # stochastic
        if self.gd_type == 'stochastic':
            w_mem = self.w.copy()
            h = np.zeros(X.shape[1])
            for i in range(self.max_iter):
                sample = np.random.randint(X.shape[0], size=self.batch_size)                
                step_size = step_size_0 / ((i+1)**0.51)
                if la.norm(y.iloc[sample] - np.dot(X.iloc[sample],self.w)) <= self.delta:
                    grad = np.dot(X.iloc[sample].T, (y.iloc[sample] - np.dot(X.iloc[sample],
                                                                             self.w)))/y.shape[0]
                    self.w -= h
                else:
                    grad =  self.delta *np.dot(X.iloc[sample].T,
                                               np.sign(y.iloc[sample] - np.dot(X.iloc[sample],
                                                                               self.w)))/y.shape[0]
                    self.w -= h
                h = self.alpha * h + step_size * grad
                self.loss_history.append(self.calc_loss(X.iloc[sample],y.iloc[sample]))
                if  la.norm(w_mem - self.w) < self.tolerance and i != 0: # end
                    break
        return self.w


    def fit(self, X, y):
        self.loss_history = []
        self.calc_gradient(X,y)
        return self
    
    def predict(self, X):
        if self.w is None:
            raise Exception('Not trained yet')
        return np.dot(X,self.w)
        pass
    
    def score(self, X, y):
        return (1 - ((y - np.dot(X,self.w))**2).sum()/((y - np.mean(y))**2).sum())