get and format dataset

In [2]:
# load in wine 
import pandas as pd

# Define column names
column_names = [
    "Class", "Alcohol", "Malicacid", "Ash", "Alcalinity_of_ash", "Magnesium",
    "Total_phenols", "Flavanoids", "Nonflavanoid_phenols", "Proanthocyanins",
    "Color_intensity", "Hue", "OD280_OD315_of_diluted_wines", "Proline"
]

# Load the CSV file
df = pd.read_csv("wine/wine.data", names=column_names)

# Display the first few rows
print(df.head())

   Class  Alcohol  Malicacid   Ash  Alcalinity_of_ash  Magnesium  \
0      1    14.23       1.71  2.43               15.6        127   
1      1    13.20       1.78  2.14               11.2        100   
2      1    13.16       2.36  2.67               18.6        101   
3      1    14.37       1.95  2.50               16.8        113   
4      1    13.24       2.59  2.87               21.0        118   

   Total_phenols  Flavanoids  Nonflavanoid_phenols  Proanthocyanins  \
0           2.80        3.06                  0.28             2.29   
1           2.65        2.76                  0.26             1.28   
2           2.80        3.24                  0.30             2.81   
3           3.85        3.49                  0.24             2.18   
4           2.80        2.69                  0.39             1.82   

   Color_intensity   Hue  OD280_OD315_of_diluted_wines  Proline  
0             5.64  1.04                          3.92     1065  
1             4.38  1.05        

In [3]:
# Remove rows where Class is 3
df = df[df["Class"] != 3]

# Display the first few rows to verify
print(df.tail())

     Class  Alcohol  Malicacid   Ash  Alcalinity_of_ash  Magnesium  \
125      2    12.07       2.16  2.17               21.0         85   
126      2    12.43       1.53  2.29               21.5         86   
127      2    11.79       2.13  2.78               28.5         92   
128      2    12.37       1.63  2.30               24.5         88   
129      2    12.04       4.30  2.38               22.0         80   

     Total_phenols  Flavanoids  Nonflavanoid_phenols  Proanthocyanins  \
125           2.60        2.65                  0.37             1.35   
126           2.74        3.15                  0.39             1.77   
127           2.13        2.24                  0.58             1.76   
128           2.22        2.45                  0.40             1.90   
129           2.10        1.75                  0.42             1.35   

     Color_intensity   Hue  OD280_OD315_of_diluted_wines  Proline  
125             2.76  0.86                          3.28      378  
126 

Implement a logistic regression baseline

In [4]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

In [5]:
y = df['Class']
X = df.drop(columns=['Class'])

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [6]:
model = LogisticRegression(max_iter=700)
model.fit(X_train, y_train)

In [7]:
y_pred = model.predict(X_test)

# Evaluate performance
accuracy = accuracy_score(y_test, y_pred)
print(f"Baseline Logistic Regression Accuracy: {accuracy:.4f}")

Baseline Logistic Regression Accuracy: 0.9615


Now we need a custom class so we can directly modify the loss function... 

In [85]:
import numpy as np
from scipy.special import expit
from itertools import product
import multiprocessing
from joblib import Parallel, delayed

class LogisticRegression:
    def __init__(self, lr=0.001, max_iter=2000, tol=1e-4, fit_intercept=True, reg_lambda=0.01):
        self.lr = lr
        self.max_iter = max_iter
        self.tol = tol  # Tolerance for convergence (minimum improvement required)
        self.fit_intercept = fit_intercept
        self.reg_lambda = reg_lambda
        self.weight = None
        self.feature_means = None
        self.feature_stds = None

    def _normalize_features(self, X):
        """Normalizes features using z-score standardization."""
        if self.feature_means is None or self.feature_stds is None:
            self.feature_means = np.mean(X, axis=0)
            self.feature_stds = np.std(X, axis=0) + 1e-8  # Avoid division by zero
        
        return (X - self.feature_means) / self.feature_stds

    def _add_intercept(self, X):
        """Adds intercept term (bias) to feature matrix."""
        return np.c_[np.ones((X.shape[0], 1)), X]

    def _sigmoid(self, z):
        """Sigmoid activation function."""
        # Clip values to prevent overflow
        z = np.clip(z, -20, 20)
        return expit(z)  # Numerically stable sigmoid

    def _compute_loss(self, X, y):
        """Computes the binary cross-entropy loss with regularization."""
        m = len(y)
        h = self._sigmoid(X @ self.weight)
        # Add small epsilon to avoid log(0)
        epsilon = 1e-15
        h = np.clip(h, epsilon, 1 - epsilon)
        
        # Add L2 regularization term (excluding bias)
        reg_term = 0.5 * self.reg_lambda * np.sum(self.weight[1:] ** 2) / m
        
        # Compute binary cross-entropy
        loss = -np.mean(y * np.log(h) + (1 - y) * np.log(1 - h))
        
        return loss + reg_term

    def _compute_gradient(self, X, y):
        """Computes the gradient of the loss function with regularization."""
        m = len(y)
        h = self._sigmoid(X @ self.weight)
        
        # Basic gradient
        gradient = (X.T @ (h - y)) / m
        
        # Add L2 regularization
        reg_term = np.zeros_like(self.weight)
        reg_term[1:] = self.reg_lambda * self.weight[1:] / m  # Don't regularize bias
        
        return gradient + reg_term

    def _handle_class_imbalance(self, X, y):
        """Calculate class weights for imbalanced datasets."""
        unique_classes = np.unique(y)
        class_weights = {}
        
        for cls in unique_classes:
            class_weights[cls] = len(y) / (len(unique_classes) * np.sum(y == cls))
            
        # Create sample weights array
        sample_weights = np.ones(len(y))
        for cls in unique_classes:
            sample_weights[y == cls] = class_weights[cls]
            
        return sample_weights
    
    def fit(self, X, y, handle_imbalance=True, verbose=True):
        """Trains the logistic regression model using gradient descent."""
        # Convert inputs to numpy arrays if they're not already
        X = np.asarray(X)
        y = np.asarray(y)
        
        # Normalize features
        X_normalized = self._normalize_features(X)
        
        if self.fit_intercept:
            X_normalized = self._add_intercept(X_normalized)

        # Initialize weights with small random values
        np.random.seed(42)  # For reproducibility
        self.weight = np.random.randn(X_normalized.shape[1]) * 0.01
        
        # Handle class imbalance
        if handle_imbalance:
            sample_weights = self._handle_class_imbalance(X, y)
        else:
            sample_weights = np.ones(len(y))
            
        previous_loss = float('inf')
        converged = False
        
        for i in range(self.max_iter):
            # Compute weighted gradient
            h = self._sigmoid(X_normalized @ self.weight)
            gradient = (X_normalized.T @ ((h - y) * sample_weights)) / np.sum(sample_weights)
            
            # Add regularization
            reg_term = np.zeros_like(self.weight)
            reg_term[1:] = self.reg_lambda * self.weight[1:] / len(y)
            gradient += reg_term
            
            # Update weights
            new_weight = self.weight - self.lr * gradient
            
            # Calculate loss for convergence check
            current_loss = self._compute_loss(X_normalized, y)
            
            # Check convergence based on loss improvement
            if abs(previous_loss - current_loss) < self.tol:
                if verbose:
                    print(f'Converged at iteration {i}, loss: {current_loss:.6f}')
                converged = True
                break
                
            self.weight = new_weight
            previous_loss = current_loss
            
            # Optional: adaptive learning rate
            if i > 0 and i % 50 == 0:
                self.lr *= 0.9  # Reduce learning rate over time
                
        if not converged and verbose:
            print(f'Did not converge after {self.max_iter} iterations. Final loss: {current_loss:.6f}')
            
        return self

    def predict_proba(self, X):
        """Returns the probability predictions."""
        X = np.asarray(X)
        X_normalized = self._normalize_features(X)
        
        if self.fit_intercept:
            X_normalized = self._add_intercept(X_normalized)

        return self._sigmoid(X_normalized @ self.weight)

    def predict(self, X, threshold=0.5):
        """Predicts class labels (0 or 1) based on a threshold."""
        return (self.predict_proba(X) >= threshold).astype(int) + 1
    
    def score(self, X, y):
        """Calculate accuracy score."""
        y_pred = self.predict(X)
        return np.mean(y_pred == y)
    
    def get_feature_importance(self):
        """Returns the absolute weights as feature importance."""
        if self.fit_intercept:
            return np.abs(self.weight[1:])
        return np.abs(self.weight)
    
    
    def fit_stochastic_coordinate_descent(self, X, y, handle_imbalance=True, verbose=True):
        """Trains the logistic regression model using stochastic coordinate descent."""
        # Convert inputs to numpy arrays
        X = np.asarray(X)
        y = np.asarray(y)
        
        # Normalize features
        X_normalized = self._normalize_features(X)
        
        if self.fit_intercept:
            X_normalized = self._add_intercept(X_normalized)
        
        # Initialize weights
        np.random.seed(42)
        n_features = X_normalized.shape[1]
        self.weight = np.random.randn(n_features) * 0.01
        
        # Handle class imbalance
        if handle_imbalance:
            sample_weights = self._handle_class_imbalance(X, y)
        else:
            sample_weights = np.ones(len(y))
        
        previous_loss = float('inf')
        
        for i in range(self.max_iter):
            # Randomly select a coordinate (feature)
            j = np.random.randint(0, n_features)
            
            # Compute gradient for the selected coordinate
            h = self._sigmoid(X_normalized @ self.weight)
            gradient_j = np.sum((h - y) * sample_weights * X_normalized[:, j]) / np.sum(sample_weights)
            
            # Add regularization (except for bias term)
            if j > 0 or not self.fit_intercept:
                gradient_j += self.reg_lambda * self.weight[j] / len(y)
            
            # Update only the selected coordinate
            self.weight[j] -= self.lr * gradient_j
            
            # Check convergence every 10*n_features updates
            if i % (10 * n_features) == 0:
                current_loss = self._compute_loss(X_normalized, y)
                if abs(previous_loss - current_loss) < self.tol:
                    if verbose:
                        print(f'Converged at iteration {i}, loss: {current_loss:.6f}')
                    break
                previous_loss = current_loss
                
                # Adaptive learning rate
                if i > 0 and i % (50 * n_features) == 0:
                    self.lr *= 0.9
        
        if verbose and i == self.max_iter - 1:
            print(f'Did not converge after {self.max_iter} iterations. Final loss: {current_loss:.6f}')
        
        return self
    
    @staticmethod
    def grid_search(X, y, param_grid, cv=5, scoring='accuracy', n_jobs=-1, verbose=True, random_SGD=False):
       """
       Perform grid search to find optimal hyperparameters.
       
       Parameters:
       -----------
       X : array-like, shape (n_samples, n_features)
           Training data
       y : array-like, shape (n_samples,)
           Target values
       param_grid : dict
           Dictionary with parameter names as keys and lists of parameter values
       cv : int, default=5
           Number of cross-validation folds
       scoring : str, default='accuracy'
           Scoring metric ('accuracy', 'precision', 'recall', 'f1', 'auc')
       n_jobs : int, default=-1
           Number of jobs to run in parallel (-1 means using all processors)
       verbose : bool, default=True
           Whether to print progress
       use_stochastic_cd : bool, default=False
           Whether to use stochastic coordinate descent instead of batch gradient descent
           
       Returns:
       --------
       dict : Best parameters and corresponding score
       """
       # Convert inputs to numpy arrays if they're not already
       X = np.asarray(X)
       y = np.asarray(y)
       
       # Prepare parameter combinations
       param_names = list(param_grid.keys())
       param_values = list(param_grid.values())
       param_combinations = list(product(*param_values))
       
       # Split data into folds
       n_samples = len(y)
       indices = np.arange(n_samples)
       np.random.seed(42)  # For reproducible CV splits
       np.random.shuffle(indices)
       fold_sizes = np.full(cv, n_samples // cv, dtype=int)
       fold_sizes[:n_samples % cv] += 1
       
       current_idx = 0
       folds = []
       for fold_size in fold_sizes:
           fold_indices = indices[current_idx:current_idx + fold_size]
           folds.append(fold_indices)
           current_idx += fold_size
           
       # Define evaluation function for a single parameter combination
       def evaluate_params(params):
           param_dict = {param_names[i]: params[i] for i in range(len(param_names))}
           
           if verbose:
               print(f"Evaluating parameters: {param_dict}")
           
           scores = []
           for i in range(cv):
               # Split data
               test_idx = folds[i]
               train_idx = np.concatenate([folds[j] for j in range(cv) if j != i])
               
               X_train, X_test = X[train_idx], X[test_idx]
               y_train, y_test = y[train_idx], y[test_idx]
               
               # Train model with current parameters
               model = LogisticRegression(**param_dict)
               
               # Use stochastic coordinate descent if specified
               if random_SGD:
                   model.fit_stochastic_coordinate_descent(X_train, y_train, verbose=False)
               else:
                   model.fit(X_train, y_train, verbose=False)
               
               # Evaluate based on scoring metric
               if scoring == 'accuracy':
                   score = model.score(X_test, y_test)
               elif scoring == 'precision':
                   y_pred = model.predict(X_test)
                   score = np.sum((y_pred == 2) & (y_test == 2)) / np.sum(y_pred == 2) if np.sum(y_pred == 2) > 0 else 0
               elif scoring == 'recall':
                   y_pred = model.predict(X_test)
                   score = np.sum((y_pred == 2) & (y_test == 2)) / np.sum(y_test == 2) if np.sum(y_test == 2) > 0 else 0
               elif scoring == 'f1':
                   y_pred = model.predict(X_test)
                   precision = np.sum((y_pred == 2) & (y_test == 2)) / np.sum(y_pred == 2) if np.sum(y_pred == 2) > 0 else 0
                   recall = np.sum((y_pred == 2) & (y_test == 2)) / np.sum(y_test == 2) if np.sum(y_test == 2) > 0 else 0
                   score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
               elif scoring == 'auc':
                   try:
                       from sklearn.metrics import roc_auc_score
                       y_prob = model.predict_proba(X_test)
                       score = roc_auc_score(y_test == 2, y_prob)
                   except ImportError:
                       # Fallback if sklearn is not available
                       y_prob = model.predict_proba(X_test)
                       # Manual AUC calculation (simplified)
                       pos_scores = y_prob[y_test == 2]
                       neg_scores = y_prob[y_test != 2]
                       if len(pos_scores) == 0 or len(neg_scores) == 0:
                           score = 0.5
                       else:
                           n_pos = len(pos_scores)
                           n_neg = len(neg_scores)
                           score = sum(pos > neg for pos in pos_scores for neg in neg_scores) / (n_pos * n_neg)
               else:
                   raise ValueError(f"Unknown scoring metric: {scoring}")
               
               scores.append(score)
           
           mean_score = np.mean(scores)
           if verbose:
               print(f"Parameters {param_dict} - Mean {scoring}: {mean_score:.4f}")
           
           return param_dict, mean_score
       
       # For small grid sizes or when parallelism causes issues, run sequentially
       if len(param_combinations) <= 4 or n_jobs == 1:
           results = [evaluate_params(params) for params in param_combinations]
       else:
           # Run evaluations in parallel
           try:
               n_jobs = n_jobs if n_jobs > 0 else multiprocessing.cpu_count()
               results = Parallel(n_jobs=n_jobs)(
                   delayed(evaluate_params)(params) for params in param_combinations
               )
           except Exception as e:
               if verbose:
                   print(f"Parallel execution failed with error: {str(e)}")
                   print("Falling back to sequential execution...")
               results = [evaluate_params(params) for params in param_combinations]
       
       # Find best parameters
       best_params, best_score = max(results, key=lambda x: x[1])
       
       if verbose:
           opt_method = "stochastic coordinate descent" if random_SGD else "batch gradient descent"
           print(f"\nGrid Search Results (using {opt_method}):")
           print(f"Best parameters: {best_params}")
           print(f"Best {scoring}: {best_score:.4f}")
       
       return {
           'best_params': best_params,
           'best_score': best_score,
           'all_results': results
       }

In [87]:
# Define parameter grid
param_grid = {
    'lr': [0.1, 0.01, 0.001],
    'reg_lambda': [0.0, 0.01, 0.1, 1.0],
    'max_iter': [1000]
}

# Perform grid search
results = LogisticRegression.grid_search(X, y, param_grid, verbose=True, random_SGD=True)

# Create model with best parameters
best_model = LogisticRegression(**results['best_params'])
best_model.fit(X_train, y_train)


Grid Search Results (using stochastic coordinate descent):
Best parameters: {'lr': 0.001, 'reg_lambda': 0.0, 'max_iter': 1000}
Best accuracy: 0.7846
Did not converge after 1000 iterations. Final loss: 0.085889


<__main__.LogisticRegression at 0x144f20430>

In [76]:
y_pred = best_model.predict(X_test)
accuracy = best_model.score(X_test, y_test)

# Evaluate performance
# accuracy = accuracy_score(y_test, y_pred)
print(f"Custom Baseline Logistic Regression Accuracy: {accuracy}")


Custom Baseline Logistic Regression Accuracy: 0.8076923076923077


In [67]:
print(y_pred[:20])
print(y_test[:20].values)

[1 2 2 2 2 1 2 2 2 2 1 2 1 2 2 1 1 2 1 2]
[1 1 1 1 2 1 2 2 1 2 1 2 1 2 1 1 1 2 1 2]


In [66]:
print(y_test['Class'].tail)

KeyError: 'Class'