In [9]:
from importlib import reload
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [10]:
%matplotlib inline
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.preprocessing import StandardScaler
from scipy.optimize import minimize

2023-11-17 09:52:07,454 - matplotlib.pyplot - DEBUG - Loaded backend module://matplotlib_inline.backend_inline version unknown.


# Define test data

In [11]:
# Generate predictors
X_raw = np.random.random(100*9)
X_raw = np.reshape(X_raw, (100, 9))

# Standardize the predictors
scaler = StandardScaler().fit(X_raw)
X = scaler.transform(X_raw)

# Add an intercept column to the model.
X = np.abs(np.concatenate((np.ones((X.shape[0],1)), X), axis=1))

# Define my "true" beta coefficients
beta = np.array([2,6,7,3,5,7,1,2,2,8])

# Y = Xb
Y_true = np.matmul(X,beta)

# Observed data with noise
Y = Y_true*np.exp(np.random.normal(loc=0.0, scale=0.2, size=100))

In [12]:
assert (Y.all()>=0 & (Y_true.all()>=0))

# Define loss function

In [13]:
def mean_square_root_error(y_pred, y_true, sample_weights=None):

    y_true_sqrt = np.sqrt(np.array(y_true))
    y_pred_sqrt = np.sqrt(np.array(y_pred))
    #print('Failed.', y_true, y_pred)
    assert len(y_true_sqrt) == len(y_pred_sqrt)
        
    if type(sample_weights) == type(None):
        return(np.mean(np.square(y_true_sqrt - y_pred_sqrt)))
    else:
        sample_weights = np.array(sample_weights)
        assert len(sample_weights) == len(y_true_sqrt)
        return(np.dot(sample_weights, (np.square(y_true_sqrt - y_pred_sqrt)))/sum(sample_weights))

def MSRE_loss(beta, X, Y, sample_weights):
    return mean_square_root_error(np.matmul(X,beta), Y, sample_weights=sample_weights)
    

# Define Model

In [14]:
class CustomLinearModel:
    """
    Linear model: Y = XB, fit by minimizing the provided loss_function
    with L1 regularization
    """
    def __init__(self, 
                 loss_function=None, 
                 X=None, 
                 Y=None, 
                 sample_weights=None, 
                 beta_init=None, 
                 regularization=0.00012):
        self.regularization = regularization
        self.beta = None
        self.residue_loss = loss_function
        self.sample_weights = sample_weights
        self.beta_init = beta_init
        
        self.X = X
        self.Y = Y
            
    
    def predict(self, X):
        prediction = np.matmul(X, self.beta)
        return(prediction)

    def model_error(self):
        error = self.residue_loss(
            self.predict(self.X), self.Y, sample_weights=self.sample_weights
        )
        return(error)
    
    def l1_regularized_loss(self, beta):
        self.beta = beta
        return(self.model_error() + \
               sum(self.regularization*np.array(self.beta)))
    
    def l2_regularized_loss(self, beta):
        self.beta = beta
        return(self.model_error() + \
               sum(self.regularization*np.array(self.beta)**2))
    
    def fit(self, maxiter=250):        
        if type(self.beta_init)==type(None):
            self.beta_init = np.array([1]*self.X.shape[1]) # default init: beta = 1
        else: 
            pass
            
        if self.beta!=None and all(self.beta_init == self.beta):
            logging.info("Model already fit once; continuing fit with more itrations.")
            
        res = minimize(self.l1_regularized_loss, self.beta_init,
                       method='TNC', options={'maxiter': maxiter},
                       bounds=[(0, None) for n in range(X.shape[1])])
        self.beta = res.x
        self.beta_init = self.beta


In [15]:
l2_msre_model = CustomLinearModel(
    loss_function=mean_square_root_error,
    X=X, Y=Y, regularization=0.01
)
l2_msre_model.fit()
l2_msre_model.beta

  res = minimize(self.l1_regularized_loss, self.beta_init,


array([4.77242623, 7.0151385 , 5.69839063, 2.91421513, 3.02609171,
       5.03206811, 0.8422265 , 3.63434853, 0.10349791, 6.60342754])

In [63]:
import optimization.inference
reload(optimization.inference)
from optimization.inference import CustomLinearModel
# Generate predictors
X_raw = np.random.random(100*9)
X_raw = np.reshape(X_raw, (100, 9))

# Standardize the predictors
scaler = StandardScaler().fit(X_raw)
X = scaler.transform(X_raw)

# Add an intercept column to the model.
X = np.abs(np.concatenate((np.ones((X.shape[0],1)), X), axis=1))

# Define my "true" beta coefficients
beta = np.array([26941161651,0,7545643210651,3489484,5154981321,7516613132,0,561648132132,2,1115158])

# Y = Xb
Y_true = np.matmul(X,beta)

# Observed data with noise
Y = Y_true*np.exp(np.random.normal(loc=0.0, scale=0.2, size=100))

sol = CustomLinearModel(residue_loss=mean_square_root_error,
                                    X=X,
                                    Y=Y_true,            
                                    reg_norm='l1',
                                    reg_param=0.1)
sol.fit(maxiter=15000,
        method='Nelder-Mead')
sol.beta

<module 'optimization.inference' from '/mnt/cmnfs/proj/ORIGINS/protMSD/maxquant/ScanByScan/optimization/inference.py'>

array([6.85834358e+11, 1.06059742e+12, 3.99403137e+11, 1.12412418e+12,
       1.26757140e+12, 5.91476871e+11, 3.28156038e+11, 2.57700513e+11,
       8.91142371e+08, 2.66637032e+11])

In [55]:
from sklearn.decomposition import sparse_encode
beta = sparse_encode(X = Y_true.reshape(-1, 1).T, 
                dictionary=X.T, 
                algorithm='lasso_cd',
                positive=True,  
                alpha=0.1,
                max_iter=250
                #n_jobs=-1
                )
beta

array([[1.78873744e+10, 0.00000000e+00, 7.54641791e+12, 1.99925088e+09,
        7.54811658e+09, 1.04636814e+10, 2.47563671e+09, 5.61386594e+11,
        1.42848537e+08, 0.00000000e+00]])

In [18]:
pd.DataFrame({
    "true_beta": beta, 
    "regularized_beta": l2_msre_model.beta
})[["true_beta",  "regularized_beta"]]

Unnamed: 0,true_beta,regularized_beta
0,2,4.772426
1,6,7.015139
2,7,5.698391
3,3,2.914215
4,5,3.026092
5,7,5.032068
6,1,0.842227
7,2,3.634349
8,2,0.103498
9,8,6.603428


# Cross Validation

In [None]:
from sklearn.model_selection import KFold

# Used to cross-validate models and identify optimal lambda
class CustomCrossValidator:
    
    """
    Cross validates arbitrary model using MAPE criterion on
    list of lambdas.
    """
    def __init__(self, X, Y, ModelClass,
                 sample_weights=None,
                 loss_function=MSRE_loss):
        
        self.X = X
        self.Y = Y
        self.ModelClass = ModelClass
        self.loss_function = loss_function
        self.sample_weights = sample_weights
    
    def cross_validate(self, lambdas, num_folds=10):
        """
        lambdas: set of regularization parameters to try
        num_folds: number of folds to cross-validate against
        """
        
        self.lambdas = lambdas
        self.cv_scores = []
        X = self.X
        Y = self.Y 
        
        # Beta values are not likely to differ dramatically
        # between differnt folds. Keeping track of the estimated
        # beta coefficients and passing them as starting values
        # to the .fit() operator on our model class can significantly
        # lower the time it takes for the minimize() function to run
        beta_init = None
        
        for lam in self.lambdas:
            print("Lambda: {}".format(lam))
            
            # Split data into training/holdout sets
            kf = KFold(n_splits=num_folds, shuffle=True)
            kf.get_n_splits(X)
            
            # Keep track of the error for each holdout fold
            k_fold_scores = []
            
            # Iterate over folds, using k-1 folds for training
            # and the k-th fold for validation
            f = 1
            for train_index, test_index in kf.split(X):
                # Training data
                CV_X = X[train_index,:]
                CV_Y = Y[train_index]
                CV_weights = None
                if type(self.sample_weights) != type(None):
                    CV_weights = self.sample_weights[train_index]
                
                # Holdout data
                holdout_X = X[test_index,:]
                holdout_Y = Y[test_index]
                holdout_weights = None
                if type(self.sample_weights) != type(None):
                    holdout_weights = self.sample_weights[test_index]
                
                # Fit model to training sample
                lambda_fold_model = self.ModelClass(
                    regularization=lam,
                    X=CV_X,
                    Y=CV_Y,
                    sample_weights=CV_weights,
                    beta_init=beta_init,
                    loss_function=self.loss_function
                )
                lambda_fold_model.fit()
                
                # Extract beta values to pass as beta_init 
                # to speed up estimation of the next fold
                beta_init = lambda_fold_model.beta
                
                # Calculate holdout error
                fold_preds = lambda_fold_model.predict(holdout_X)
                fold_msre = MSRE_loss(
                    beta=beta_init, 
                    X=holdout_X, 
                    Y=holdout_Y, 
                    sample_weights=holdout_weights
                )
                k_fold_scores.append(fold_msre)
                print("Fold: {}. Error: {}".format( f, fold_msre))
                f += 1
            
            # Error associated with each lambda is the average
            # of the errors across the k folds
            lambda_scores = np.mean(k_fold_scores)
            print("LAMBDA AVERAGE: {}".format(lambda_scores))
            self.cv_scores.append(lambda_scores)
        
        # Optimal lambda is that which minimizes the cross-validation error
        self.lambda_star_index = np.argmin(self.cv_scores)
        self.lambda_star = self.lambdas[self.lambda_star_index]
        print("\n\n**OPTIMAL LAMBDA: {}**".format(self.lambda_star))

# Test result

In [None]:
# User must specify lambdas over which to search
lambdas = [0, 1, 0.1, 0.01, 0.001, 0.0001, 0.00001, 0.000001]

cross_validator = CustomCrossValidator(
    X = X, Y = Y, 
    ModelClass=CustomLinearModel,
    loss_function=mean_square_root_error
)
cross_validator.cross_validate(lambdas, num_folds=5)

  res = minimize(self.l1_regularized_loss, self.beta_init,


Lambda: 0
Fold: 1. Error: 0.24161468288513116
Fold: 2. Error: 0.4440726849158496
Fold: 3. Error: 0.39231035399740993
Fold: 4. Error: 0.22418302737394336
Fold: 5. Error: 0.36152233065575695
LAMBDA AVERAGE: 0.3327406159656182
Lambda: 1
Fold: 1. Error: 7.813011419927335
Fold: 2. Error: 10.291948606588054
Fold: 3. Error: 9.998813189547157
Fold: 4. Error: 8.908148936872646
Fold: 5. Error: 10.942262793729359
LAMBDA AVERAGE: 9.59083698933291
Lambda: 0.1
Fold: 1. Error: 0.9529363268791888
Fold: 2. Error: 1.085995217827644
Fold: 3. Error: 0.9023950876505091
Fold: 4. Error: 0.6193653711344176
Fold: 5. Error: 0.860909121362684
LAMBDA AVERAGE: 0.8843202249708886
Lambda: 0.01
Fold: 1. Error: 0.5047102711225226
Fold: 2. Error: 0.24133242861437054
Fold: 3. Error: 0.31536502080120465
Fold: 4. Error: 0.3147332698657366
Fold: 5. Error: 0.42414583415011153
LAMBDA AVERAGE: 0.36005736491078916
Lambda: 0.001
Fold: 1. Error: 0.32076192890783306
Fold: 2. Error: 0.2937394958262522
Fold: 3. Error: 0.33604966941