Multiple Linear regression & CART implementation

Packages needed for the project

In [8]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, r2_score

from sklearn.model_selection import train_test_split


PREPROCESSING DATA 

1. Firstly we load the data into Python.

In [9]:
redwine = pd.read_csv(r'C:\Users\JoAnN\OneDrive\Skrivbord\MASTER\Artificial intelligence for data science\Python\winequality-red.csv', sep=';')
whitewine = pd.read_csv(R'C:\Users\JoAnN\OneDrive\Skrivbord\MASTER\Artificial intelligence for data science\Python\winequality-white.csv', sep=';')

2. Then we add a new feature, type, and concatenate the two files. We add -1 for red wine and 1 for white.

In [10]:
redwine['type'] = 'red'
whitewine['type'] = 'white'

wine = pd.concat([redwine, whitewine], ignore_index=True)
wine['type'] = wine['type'].map({'red': -1, 'white': 1})

3. Then we wanna divide the data into target variable (y = quality) and features

In [14]:
X = wine.drop('quality', axis=1).values
y = wine['quality'].values

4. Divide the data into training and test data

In [16]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)

5. The data is scaled (standardized), meaning it will have a mean of 0 and standard deviation of 1. $(X- \mu)/\sigma$, where mu and sigma is for each column of the data. 

In [18]:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

6. Add the "bias" or $\theta_0$ to X, it will be a column of ones. OBS just run ones otherwise it gets incremented

In [20]:
X_train_scaled = np.c_[np.ones(X_train_scaled.shape[0]), X_train_scaled]
X_test_scaled = np.c_[np.ones(X_test_scaled.shape[0]), X_test_scaled]



MULTIPLE REGRESSION IMPLEMENTATION


 Gradient descent method

Firstly we initialize the weights to zero, "theta = np.zeros(n) "


 Then we calculate the gradient vector of the costfunction

$$ \nabla_{\theta} \text{MSE}(\boldsymbol{\theta}) = \frac{2}{m} \boldsymbol{X}^T(\boldsymbol{X} \boldsymbol{\theta} - \boldsymbol{y}) $$

Which corresponds to gradient = (2/m) * (X.T @ ( (X @ theta) - y ) )  in the code


Update the weigthts 

$$ \boldsymbol{\theta}^{\text{next step}} = \boldsymbol{\theta} - \eta \nabla_{\boldsymbol{\theta}} \text{MSE}(\boldsymbol{\theta}) $$

where $\eta$ is the learning rate. This corresponds to theta = theta - eta * gradient in the code

If the norm of the gradient is less than some epsilon (tolerance) we break the algorithm




In [22]:
def gradient_descent(X, y, eta, iterations, tol):
    m, n = X.shape  # Number of observations and features
    theta = np.zeros(n)  # Initialize weights
    for i in range(iterations):
        gradient = (2/m) * (X.T @ ( (X @ theta) - y ) )  #  gradient
        theta = theta - eta * gradient  # Uppdate weights
        
        if np.linalg.norm(gradient) < tol:
            print(f"Converged at iteration {i}")
            break
    return theta

FIND THE WEIGHTS 


2. Thereafter we set the learning rate, iteration and tolerance

In [24]:
learning_rate = 0.01
iterations = 1000
tol = 0.0001
theta_hat = gradient_descent(X_train_scaled, y_train, learning_rate, iterations, tol)

print("Theta:", theta_hat)

Theta: [ 5.82547623  0.04124991 -0.25129553 -0.01200046  0.20264535 -0.02590926
  0.09610745 -0.09064641 -0.15089026  0.0340718   0.10019783  0.34352783
 -0.10776505]


ALMOST the same results the standard formula

In [26]:
A = np.linalg.inv(np.dot(np.transpose(X_train_scaled),X_train_scaled)) 
B = np.dot(np.transpose(X_train_scaled),y_train)

beta = np.dot(A,B)
print(beta)

[ 5.82547624  0.09245014 -0.24936989 -0.01025915  0.29070699 -0.02290224
  0.08747713 -0.07533195 -0.29238194  0.06413342  0.10800171  0.28318006
 -0.15992148]


3. Make predictions $ \hat{y} = \mathbf{X}_{\text{test}} \cdot \boldsymbol{\theta} $


In [28]:
yhat = X_test_scaled @ theta_hat
print(yhat)

[5.62077808 5.54972371 5.42764299 ... 5.26335164 5.37407471 6.10852965]


Evaluation metrics, MAE, Accuracy, MEAM, CEMord

In [30]:
def calculate_mae(y_test, yhat):
    # Make sure lengths match
    if len(y_test) != len(yhat):
        raise ValueError("Length of y_test and yhat must be the same")
    
    # Calculate absolute differences
    abs_diff = [abs(y - yhat_i) for y, yhat_i in zip(y_test, yhat)]
    
    # Calculate mean
    mae = sum(abs_diff) / len(y_test)
    
    return mae

def calculate_accuracy(y_test, yhat):
    """
    Accuracy as defined in paper: (1/n)∑(1 if y_i = ŷ_i, 0 otherwise)
    """
    # Round predictions to nearest integer since wine quality is discrete
    yhat_rounded = np.round(yhat)
    correct = sum(1 for y, y_pred in zip(y_test, yhat_rounded) if y == y_pred)
    return correct / len(y_test)

def calculate_mae_m(y_test, yhat):
    """
    Macro Averaged MAE as defined in paper:
    MAEM = (1/#K)∑(MAEμ for each class)
    """
    # Get unique classes
    classes = np.unique(y_test)
    
    # Calculate MAE for each class
    class_maes = []
    for k in classes:
        # Get indices where true value is class k
        class_indices = y_test == k
        if sum(class_indices) > 0:  # Only if we have samples of this class
            mae_k = calculate_mae(y_test[class_indices], yhat[class_indices])
            class_maes.append(mae_k)
    
    # Return average of class MAEs
    return np.mean(class_maes)

def calculate_cem_ord(y_test, yhat):
    """
    CEMORD (Closeness Evaluation Measure) as defined in paper
    """
    # Round predictions since wine quality is discrete
    yhat_rounded = np.round(yhat)
    
    # Get unique classes
    classes = np.unique(y_test)
    n_classes = len(classes)
    
    # Create confusion matrix
    cm = np.zeros((n_classes, n_classes))
    for i, j in zip(y_test, yhat_rounded):
        i_idx = np.where(classes == i)[0][0]
        j_idx = np.where(classes == j)[0][0]
        cm[i_idx, j_idx] += 1
    
    # Calculate proximity for each pair of classes
    def prox(k1, k2):
        k1_count = sum(y_test == k1)
        between_count = sum((y_test >= min(k1, k2)) & (y_test <= max(k1, k2)))
        total = len(y_test)
        return -np.log2((k1_count/2 + between_count) / total)
    
    # Calculate numerator and denominator
    num = 0
    den = 0
    for i in classes:
        for j in classes:
            num += prox(i, j) * cm[np.where(classes == i)[0][0], np.where(classes == j)[0][0]]
            if i == j:
                den += prox(i, i) * sum(y_test == i)
    
    return num/den if den != 0 else 0

# Update your cross validation function to include all metrics
def evaluate_all_metrics(y_test, yhat):
    """
    Calculate all performance metrics
    """
    return {
        'mae': calculate_mae(y_test, yhat),
        'accuracy': calculate_accuracy(y_test, yhat),
        'mae_m': calculate_mae_m(y_test, yhat),
        'cem_ord': calculate_cem_ord(y_test, yhat)
    }

5 fold cross validation 

In [80]:
def cross_validation(X, y,func_in, model_params={}, k_folds=5,random_state = 42, learning_rate=0.01, iterations=1000, tol=0.0001):
    # Initialize metrics lists
    metrics_lists = {
        'mae': [], 'accuracy': [], 'mae_m': [], 'cem_ord': []
    }
    
    # Get total number of samples
    n_samples = len(X)
    
    # Create random permutation of indices
    np.random.seed(random_state)
    indices = np.random.permutation(n_samples)
    
    # Calculate fold size
    fold_size = n_samples // k_folds
    
    for fold in range(k_folds):
        # Calculate start and end indices for validation fold
        val_start = fold * fold_size
        val_end = (fold + 1) * fold_size if fold < k_folds - 1 else n_samples
        
        # Get validation indices for this fold
        val_indices = indices[val_start:val_end]
        
        # Get training indices (all indices except validation)
        train_indices = np.concatenate([indices[:val_start], indices[val_end:]])
        
        # Split data into training and validation using indices
        X_train, X_val = X[train_indices], X[val_indices]
        y_train, y_val = y[train_indices], y[val_indices]
        
        # Scale the data
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_val_scaled = scaler.transform(X_val)
        
        y_pred= func_in(X_train_scaled,X_val_scaled, y_train,**model_params)
                
        # Calculate all metrics
        fold_metrics = evaluate_all_metrics(y_val, y_pred)
        
        # Store metrics
        for metric_name, value in fold_metrics.items():
            metrics_lists[metric_name].append(value)
        
        # Print fold results
        print(f"\nFold {fold + 1} results:")
        for metric_name, value in fold_metrics.items():
            print(f"{metric_name}: {value:.4f}")
    
    # Calculate means and standard deviations
    results = {}
    print("\nOverall results:")
    for metric_name, values in metrics_lists.items():
        mean_val = np.mean(values)
        std_val = np.std(values)
        results[f'{metric_name}_mean'] = mean_val
        results[f'{metric_name}_std'] = std_val
        print(f"{metric_name}: {mean_val:.4f} (±{std_val:.4f})")
    
    return results


CART REGRESSION TREE IMPLEMENTATION

In [112]:
def CART_reg(X_train_scaled, X_val_scaled, y_train, max_depth=10):
    """CART model function with standardized interface for cross validation"""
    tree = RegressionCART(max_depth=max_depth)
    tree.fit(X_train_scaled, y_train)
    
    # Make predictions
    y_pred = tree.predict(X_val_scaled)
    return y_pred

class Node:
    """A Node with a constructor"""
    def __init__(self, feature_index: Optional[int] = None, threshold: Optional[float] = None,
                 left: Optional['Node'] = None, right: Optional['Node'] = None, value: Optional[float] = None):
        self.feature_index = feature_index  # Which feature index is used to split data
        self.threshold = threshold          # Threshold value for the split
        self.left = left                    # Left child node
        self.right = right                  # Right child node
        self.value = value                  # If node is leaf, store the predicted value(mean) in value

class RegressionCART:
    """ CART class"""
    
    def __init__(self, max_depth: int = 10, min_samples_split: int = 2, min_samples_leaf: int = 1):
        """ Constructor """
        self.max_depth = max_depth  # Maximum depth of tree, prevents overfitting
        self.min_samples_split = min_samples_split # Minimum number of samples to split a node
        self.min_samples_leaf = min_samples_leaf # Minimum number of samples required in a leaf
        self.root = None # Root of tree

    def fit(self, X: np.ndarray, y: np.ndarray) -> None:
        """ Help function, calls grow_tree on the whole dataset and stores it in root."""
        self.n_features = X.shape[1]  # Store number of features
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> Node:
        """ Recursively grow the tree by splitting the data."""
        n_samples, n_features = X.shape
        
        # Check stopping criteria
        if (depth >= self.max_depth or 
            n_samples < self.min_samples_split or 
            len(np.unique(y)) == 1):
            return Node(value=np.mean(y))
        
        # Find the best split
        best_feature, best_threshold = self._find_best_split(X, y)
        
        # If no valid split is found, create a leaf node
        if best_feature is None:
            return Node(value=np.mean(y))
        
        # Create the split masks
        left_mask = X[:, best_feature] <= best_threshold
        
        # Split the data
        X_left, y_left = X[left_mask], y[left_mask]
        X_right, y_right = X[~left_mask], y[~left_mask]
        
        # Check minimum samples in leaves
        if len(y_left) < self.min_samples_leaf or len(y_right) < self.min_samples_leaf:
            return Node(value=np.mean(y))
        
        # Create node and recursively grow children
        left_child = self._grow_tree(X_left, y_left, depth + 1)
        right_child = self._grow_tree(X_right, y_right, depth + 1)
        
        return Node(
            feature_index=best_feature,
            threshold=best_threshold,
            left=left_child,
            right=right_child
        )
        
    def _calculate_mse(self, y: np.ndarray) -> float:
        """ Calculate the Mean Squared Error (MSE) for a node."""
        if len(y) == 0:
            return 0.0
        return np.mean((y - np.mean(y)) ** 2)

    def _find_best_split(self, X: np.ndarray, y: np.ndarray) -> Tuple[Optional[int], Optional[float]]:
        """ Find the best split that minimizes the weighted MSE."""
        best_mse = float('inf')
        best_feature = None
        best_threshold = None
        n_samples = len(y)
        
        # Iterate through all features
        for feature_idx in range(X.shape[1]):
            feature_values = X[:, feature_idx]
            thresholds = np.unique(feature_values)
            
            # Try all possible thresholds
            for threshold in thresholds:
                # Create masks for the split
                left_mask = feature_values <= threshold
                right_mask = ~left_mask
                
                # Skip if split creates empty nodes
                if not left_mask.any() or not right_mask.any():
                    continue
                
                # Get the split data
                y_left = y[left_mask]
                y_right = y[right_mask]
                
                # Calculate the weighted MSE
                n_left, n_right = len(y_left), len(y_right)
                mse = (n_left * self._calculate_mse(y_left) + 
                      n_right * self._calculate_mse(y_right)) / n_samples
                
                # Update best split if this is better
                if mse < best_mse:
                    best_mse = mse
                    best_feature = feature_idx
                    best_threshold = threshold
        
        return best_feature, best_threshold

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Predict target values for samples in X."""
        return np.array([self._traverse_tree(x, self.root) for x in X])

    def _traverse_tree(self, x: np.ndarray, node: Node) -> float:
        """Traverse the tree to make a prediction for a single sample."""
        if node.value is not None:
            return node.value
        
        if x[node.feature_index] <= node.threshold:
            return self._traverse_tree(x, node.left)
        return self._traverse_tree(x, node.right)

Multiple Linear regression function

In [None]:
def mul_reg(X_train_scaled,X_val_scaled, y_train, learning_rate, iterations, tol):
            # Add bias term
        X_train_scaled = np.c_[np.ones(X_train_scaled.shape[0]), X_train_scaled]
        X_val_scaled = np.c_[np.ones(X_val_scaled.shape[0]), X_val_scaled]
        
        # Train model
        theta = gradient_descent(X_train_scaled, y_train, learning_rate, iterations, tol)
        
        # Make predictions
        y_pred = X_val_scaled @ theta
        return y_pred
    

RUN MULTIPLE LIENAR REGRSSION MODEL

In [None]:
redwine = pd.read_csv(r'C:\Users\JoAnN\OneDrive\Skrivbord\MASTER\Artificial intelligence for data science\Python\winequality-red.csv', sep=';')
whitewine = pd.read_csv(R'C:\Users\JoAnN\OneDrive\Skrivbord\MASTER\Artificial intelligence for data science\Python\winequality-white.csv', sep=';')

redwine['type'] = 'red'
whitewine['type'] = 'white'

wine = pd.concat([redwine, whitewine], ignore_index=True)
wine['type'] = wine['type'].map({'red': -1, 'white': 1})

X = wine.drop('quality', axis=1).values
y = wine['quality'].values
mlr_params = {
    'learning_rate': 0.01,
    'iterations': 1000,
    'tol': 0.0001
}
results = cross_validation(X, y, mul_reg, model_params=mlr_params,k_folds=5)

RUN CART MODEL

In [121]:

cart_params = {
    'max_depth': 10
}
results_cart = cross_validation(X, y, CART_reg, model_params=cart_params)



Fold 1 results:
mae: 0.5630
accuracy: 0.5335
mae_m: 1.2995
cem_ord: 0.5474

Fold 2 results:
mae: 0.5581
accuracy: 0.5597
mae_m: 1.1217
cem_ord: 0.5679

Fold 3 results:
mae: 0.5647
accuracy: 0.5466
mae_m: 1.3387
cem_ord: 0.5714

Fold 4 results:
mae: 0.5537
accuracy: 0.5735
mae_m: 1.2508
cem_ord: 0.5758

Fold 5 results:
mae: 0.5329
accuracy: 0.5688
mae_m: 0.9836
cem_ord: 0.5869

Overall results:
mae: 0.5545 (±0.0115)
accuracy: 0.5564 (±0.0147)
mae_m: 1.1988 (±0.1301)
cem_ord: 0.5699 (±0.0129)
