In [5]:
import numpy as np
import pandas as pd
from scipy.stats import multivariate_normal

def generate_synth_dataset(p: float, n: int, d: int, g: float)->pd.DataFrame:
    """Generating a synthetic dataset
    Args:
        p (float): class prior probability
        n (int): number of observations
        d (int): number of features (dimensions)
        g (float): correlation between features
    Returns:
        Datadrame: dataset with d features, n rows
    """
    Y = np.random.binomial(1, p, size=n)
    S = np.array([[g ** abs(i - j) for j in range(d)] for i in range(d)])
    
    mean_0 = np.zeros(d)
    mean_1 = np.array([1/(i+1) for i in range(d)])
    
    X = np.array([
        multivariate_normal.rvs(mean=mean_1 if y == 1 else mean_0, cov=S)
        for y in Y
    ])
    
    feature_names = [f'f{i+1}' for i in range(d)]
    dataset = pd.DataFrame(X, columns=feature_names)
    dataset['Y'] = Y
    
    return dataset

In [None]:
from sklearn.linear_model import LogisticRegression

ban = LogisticRegression()

In [8]:
p = 0.5   # probability of class = 1
n = 200   # number of observations
d = 10    # number of features
g = 0.5   # correlation between features

dataset = generate_synth_dataset(p, n, d, g)
print(dataset.head())

         f1        f2        f3        f4        f5        f6        f7  \
0  0.808324  1.115406  0.353526  0.992571 -0.829391  2.038127  1.337364   
1  0.950230  1.927966  1.007024 -0.300203  0.686399  0.630842  1.352010   
2  0.966167 -0.555917  0.638940  0.417741  1.067720 -0.148997  0.808546   
3  3.121562  0.753335  2.102468  0.679483  0.384577  0.284980  1.029542   
4 -0.013053  0.459288  0.100058 -0.947503 -0.534097 -1.384567 -1.924794   

         f8        f9       f10  Y  
0  0.225057  0.186019 -0.609141  1  
1 -0.933978 -0.617282  0.816854  0  
2 -0.906178  0.170930  0.702563  1  
3  1.661278  1.463126  0.478464  1  
4 -1.913593 -0.286615 -1.641509  1  


In [10]:
from sklearn.model_selection import train_test_split

feature_cols =['f1','f2','f3','f4','f5','f6','f7','f8', 'f9','f10']
y = dataset.Y
X = dataset[feature_cols]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=1) # 0.25 x 0.8 = 0.2


In [33]:
β = np.zeros(X_train.shape[1])
xi = X_train.iloc[0].values
print(xi)
print(β)
np.dot(xi, β)


[ 2.37092453  1.73095473  1.38565474  0.97960477  0.73031586 -0.52614245
  1.0550614   1.25398968 -0.94387923  0.92960952]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


0.0

In [46]:
np.array(y_train)

array([1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1,
       1, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1,
       1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0, 1, 1, 0, 1, 0, 1, 0,
       1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0,
       0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 1, 1, 1, 0, 1, 1, 0, 0])

In [63]:
from sklearn.metrics import roc_auc_score, average_precision_score, recall_score, precision_score, f1_score, balanced_accuracy_score

class LogRegCCD:
    def __init__(self, lambdas):
        self.lambdas = lambdas
        self.coef_path_ = []
        self.intercept_path_ = []
        self.best_lambda_ = None
        self.best_coef_ = None
        self.best_intercept_ = None

    def fit(self, X_train, y_train, alpha=1.0, tol=1e-8, max_iter=100):
        X = np.array(X_train)
        y = np.array(y_train)
        N, d = X.shape

        for l in self.lambdas:
            b = np.zeros(X_train.shape[1])
            b_0 = 0
            residual = y - 0.5
            for i in range(max_iter):
                b_old = b.copy()
                b0_old = b_0

                linear_comb = b_0 + X @ b
                p = 1 / (1 + np.exp(-linear_comb))
                # w = p * (1 - p)
                w = np.maximum(p * (1 - p), 1e-10)
                z = linear_comb + (y - p) / w
                lq = -0.5 * np.sum(w * (z - X @ b) ** 2)  # Compute current quadratic function

                b_0 = np.sum(w * (z - X @ b)) / np.sum(w) #new
                for j in range(X_train.shape[1]):
                    X_j = X[:, j]
                    r_j = residual + b[j] * X_j

                    numerator = np.sum(w * X_j * r_j)
                    denominator = np.sum(w * X_j**2) + l * (1 - alpha)
                    denominator = max(denominator, 1e-10)

                    new_bj = self.soft_threshold(numerator, l * alpha) / denominator

                    if np.abs(new_bj - b[j]) > tol:
                      residual -= (new_bj - b[j]) * X_j  # Update
                    # print(b)
                    # print(b[j])
                    b[j] = new_bj
                    # print(f"Iteration {i}, Feature {j}, Numerator: {numerator:.5f}, Denominator: {denominator:.5f}")
                    # print(f"new bj {new_bj}")
                # print(b_0)
                if np.max(np.abs(b - b_old)) < tol and np.abs(b_0 - b0_old) < tol:
                    break
            # print(l)
            self.coef_path_.append(b.copy())
            self.intercept_path_.append(b_0)

    def validate(self, X_valid, y_valid, measure='roc_auc'):
        best_score = -np.inf
        best_lambda = None
        best_index = None

        for i, l in enumerate(self.lambdas):
          b = self.coef_path_[i]
          b_0 = self.intercept_path_[i]
          probas = 1 / (1 + np.exp(-(b_0 + X_valid @ b)))
          if measure in ['recall', 'precision', 'f_measure', 'balanced_accuracy']:
            predictions = (probas >= 0.5).astype(int)
            score = self.compute_measure(y_valid, predictions, measure)
          elif measure == 'roc_auc':
            score = roc_auc_score(y_valid, probas)
          elif measure == 'sensitivity_precision_auc':
              score = average_precision_score(y_valid, probas)

          if score > best_score:
              best_score = score
              best_lambda = l
              best_index = i
        self.best_lambda_ = best_lambda
        self.best_coef_ = self.coef_path_[best_index]
        self.best_intercept_ = self.intercept_path_[best_index]

    def predict_proba(self, X_test):
        # Predict probabilities using best_lambda_ coefficients
        pass

    def plot(self, X_valid, y_valid, measure='roc_auc'):
        # Plot performance measure vs lambda
        pass

    def plot_coefficients(self):
        # Plot coefficient paths vs lambda
        pass

    def soft_threshold(self, z, gamma):
        if z > gamma:
            return z - gamma
        elif z < -gamma:
            return z + gamma
        else:
            return 0

    def compute_measure(self, y_true, y_pred, measure):
      if measure == "recall":
          return recall_score(y_true, y_pred)
      elif measure == "precision":
          return precision_score(y_true, y_pred)
      elif measure == "f_measure":
          return f1_score(y_true, y_pred)
      elif measure == "balanced_accuracy":
          return balanced_accuracy_score(y_true, y_pred)
      else:
          raise ValueError(f"Unknown measure: {measure}")

In [67]:
lambdas = np.linspace(0.001, 0.000001, 100)  # explicitly 100 lambda values from 0.001 to 0.000001
model = LogRegCCD(lambdas=lambdas)
model.fit(X_train, y_train)
model.validate(X_val, y_val)
# print(model.intercept_path_)
print(model.best_coef_)


[ 0.18422399  0.00347195  0.00170952  0.01924258  0.01199868  0.01228861
  0.09064358 -0.02433651 -0.06089889 -0.06490071]
