In [1]:
import numpy as np
import matplotlib.pyplot as plt

from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Lasso

np.random.seed(666)

In [2]:
def soft_threshold(a, b):
    return np.sign(a) * np.maximum((np.abs(a) - b), 0)

In [3]:
class LinRegLasso:
    __slots__ = ["__beta_tilde", "__gamma", "__max_iter", "__tol", "__mean_X", "__std_X", "__mean_y"]

    @property
    def coef_(self):
        return self.__beta_tilde

    @property
    def intercept_(self):
        return self.__mean_y
    
    def __init__(self, C=1.0, max_iter=1000, tol=0.0001):
        self.__beta_tilde = np.array([])
        
        self.__gamma = float("+inf") if C == 0 else 1 / C
        self.__max_iter = max_iter
        self.__tol = tol

        self.__mean_X = 0
        self.__std_X = 1
        self.__mean_y = 0
        
        return

    def fit(self, X, y):
        n, p = X.shape
        
        self.__mean_X = X.mean(axis=0)
        self.__std_X = X.std(axis=0)

        X_standarized = (X - self.__mean_X) / (np.sqrt(n - 1) * self.__std_X)

        self.__mean_y = y.mean()
        y_centered = y - self.__mean_y

        self.__beta_tilde = soft_threshold(X_standarized.T @ y_centered, self.__gamma)
        
        converged = False
        for _ in range(self.__max_iter):
            old_beta_tilde = self.__beta_tilde.copy()
            
            self.__beta_tilde = soft_threshold(self.__beta_tilde + X_standarized.T @ (y_centered - X_standarized @ self.__beta_tilde), self.__gamma)

            if np.linalg.norm(old_beta_tilde - self.__beta_tilde, 2) < self.__tol:
                converged = True
                
                break

        if not converged:
            print("algorithm did not converge")
        
        return self

    def predict(self, X):
        n, p = X.shape
        
        X_standarized = (X - self.__mean_X) / (np.sqrt(n - 1) * self.__std_X)
        
        return X_standarized @ self.__beta_tilde + self.__mean_y

In [4]:
def help_solve(d, a, b, c, lambda_a, lambda_b, lambda_c):
    if d + lambda_a + lambda_b + lambda_c <= a:
        return d + lambda_a + lambda_b + lambda_c
    elif a < d - lambda_a + lambda_b + lambda_c <= b:
        return d - lambda_a + lambda_b + lambda_c
    elif b < d - lambda_a - lambda_b + lambda_c <= c:
        return d - lambda_a - lambda_b + lambda_c
    elif c < d - lambda_a - lambda_b - lambda_c:
        return d - lambda_a - lambda_b - lambda_c

    return None

In [5]:
class LinRegFusedLasso:
    __slots__ = ["__beta_tilde", "__lambda_1", "__lambda_2", "__max_iter", "__tol", "__mean_X", "__std_X", "__mean_y"]

    @property
    def coef_(self):
        return self.__beta_tilde

    @property
    def intercept_(self):
        return self.__mean_y
    
    def __init__(self, lambda_1=1.0, lambda_2=1.0, max_iter=1000, tol=0.0001):
        self.__beta_tilde = None
        
        self.__lambda_1 = lambda_1
        self.__lambda_2 = lambda_2
        self.__max_iter = max_iter
        self.__tol = tol

        self.__mean_X = 0
        self.__std_X = 1
        self.__mean_y = 0
        
        return

    def __loss(self, beta, X, y):
        return 0.5 * ((y - X @ beta) ** 2).sum() + self.__lambda_1 * abs(beta).sum() + self.__lambda_2 * abs(beta[1:] - beta[:-1]).sum()

    def fit(self, X, y):
        n, p = X.shape
        
        self.__mean_X = X.mean(axis=0)
        self.__std_X = X.std(axis=0)

        X_standarized = (X - self.__mean_X) / (np.sqrt(n - 1) * self.__std_X)

        self.__mean_y = y.mean()
        y_centered = y - self.__mean_y

        self.__beta_tilde = np.zeros(p)

        converged = False
        for _ in range(self.__max_iter):
            old_beta_tilde = self.__beta_tilde.copy()

            d = X_standarized.T @ (y_centered - X_standarized @ self.__beta_tilde) + self.__beta_tilde

            for j in range(p):                    
                breakpoints = np.array([0, 0 if j == 0 else self.__beta_tilde[j - 1], 0 if j == p - 1 else self.__beta_tilde[j + 1]])
                penalties = np.array([self.__lambda_1, 0 if j == 0 else self.__lambda_2, 0 if j == p - 1 else self.__lambda_2])
                s = np.argsort(breakpoints)
                breakpoints = breakpoints[s]
                penalties = penalties[s]

                a, b, c = breakpoints[0], breakpoints[1], breakpoints[2]
                lambda_a, lambda_b, lambda_c = penalties[0], penalties[1], penalties[2]

                sol = help_solve(d[j], a, b, c, lambda_a, lambda_b, lambda_c)

                min_v = float("+inf")
                if sol is None:
                    for _breakpoint in breakpoitns:
                        self.__beta_tilde[j] = _breakpoint
    
                        v = self.__loss(self.__beta_tilde, X_standarized, y_centered)
                        if v < min_v:
                            min_v = v
                            sol = _breakpoint
                    
                self.__beta_tilde[j] = sol

            if np.linalg.norm(old_beta_tilde - self.__beta_tilde, 2) < self.__tol:
                converged = True
                
                break

        if not converged:
            print("algorithm did not converge")
        
        return self

    def predict(self, X):
        n, p = X.shape
        
        X_standarized = (X - self.__mean_X) / (np.sqrt(n - 1) * self.__std_X)
        
        return X_standarized @ self.__beta_tilde + self.__mean_y

In [6]:
n = 10000
p = 5

In [7]:
X = np.random.normal(20, 10, (n, p))
beta = np.random.randint(-4, 5, p)
y = X @ beta + 30

In [8]:
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.7)

In [9]:
model = LinRegLasso().fit(X_train, y_train)

In [10]:
print(model.intercept_, *model.coef_)

129.74521277857235 -1680.7257137792953 2491.426212103525 -0.0 838.0838473872583 2499.7178580732298


In [11]:
mean_squared_error(y_train, model.predict(X_train))

0.0005639646679870428

In [12]:
mean_squared_error(y_test, model.predict(X_test))

643.4759603548349

In [13]:
model_fused = LinRegFusedLasso().fit(X_train, y_train)

In [14]:
print(model_fused.intercept_, *model_fused.coef_)

129.74521277857235 -1679.7883745477761 2489.4368812842654 0.9277898795027519 838.1022903872771 2498.7104891738013


In [15]:
mean_squared_error(y_train, model_fused.predict(X_train))

0.002666511668795142

In [16]:
mean_squared_error(y_test, model_fused.predict(X_test))

641.3956419045485

In [17]:
_model = Lasso().fit(X_train, y_train)

In [18]:
print(_model.intercept_, *_model.coef_)

30.406426124648846 -1.9904433599233415 2.9898788957388054 -0.0 0.9901108613338742 2.9900278819160304


In [19]:
mean_squared_error(y_train, _model.predict(X_train))

0.03953900022388478

In [20]:
mean_squared_error(y_test, _model.predict(X_test))

0.0397956973212512