In [9]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statistics import mean

from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error


**Helper Functions**

In [10]:
import plotly.graph_objects as go

def plot_3d_regression(model, model_name, data_type = 'linear'):

    # Make Standard Data
    X, y = make_regression(
    n_samples=300,     # Number of samples
    n_features=2,      # Number of features
    noise=15,          # Add noise to make it realistic
    random_state=42    # Set seed for reproducibility
)

    # Make standard non-linear data
    if data_type.lower() == 'non-linear':
        y = y + 500 * np.sin(X[:, 0]) + 250 * np.cos(X[:, 1])

    # Make non-standard data
    if data_type == 'ugly':
        # Add non-linear transformations
        y = y + 500 * np.sin(X[:, 0]) * np.cos(X[:, 1]) \
              + 300 * (X[:, 0] ** 2) \
              - 200 * np.exp(-X[:, 1] ** 2)

        # Add random feature interactions
        y = y + 100 * (X[:, 0] * X[:, 1])

        # Add random noise to make it "uglier"
        y = y + np.random.normal(0, 150, size=y.shape)

    model.fit(X, y)


    # Create a mesh grid for the features
    x_grid, y_grid = np.meshgrid(np.linspace(min(X[:, 0]), max(X[:, 0]), 100),
                                np.linspace(min(X[:, 1]), max(X[:, 1]), 100))
    grid = np.vstack((x_grid.flatten(), y_grid.flatten())).T 

    predictions = model.predict(grid)

    # Create 3D scatter plot for training data
    scatter = go.Scatter3d(
        x=X[:, 0], y=X[:, 1], z=y,
        mode='markers', marker=dict(color='blue', size=5), name='Training Data'
    )

    # Create 3D surface plot for the regression surface
    surface = go.Surface(
        x=x_grid, y=y_grid, z=predictions.reshape(x_grid.shape), opacity=0.5, colorscale='Viridis', name='Regression Surface'
    )

    # Combine both traces into one figure
    fig = go.Figure(data=[scatter, surface])

    # Update layout for better visualization
    fig.update_layout(
        title=f'Training Data and Regression Surface for {model_name}',
        scene=dict(
            xaxis_title='Feature 1',
            yaxis_title='Feature 2',
            zaxis_title='Target'
        )
    )

    # Show plot
    fig.show()

# Linear Regression
## OLS

In [11]:
class ols_regression():

    # Initialize the class
    def __init__(self):
        pass       
    
    def fit(self, X, y):
        '''Fit the regression to the X data via the OLS equation'''

        # Add a leading colums of 1s to the X data to account for the bias term
        X = np.hstack((np.ones((X.shape[0], 1)), X))

        # Train the data on (X.T @ X)^(-1) @ X.T @ y
        ols = np.linalg.inv(X.T.dot(X)).dot(X.T.dot(y))
        self.coef = ols[1:]
        self.bias = ols[0]

    def predict(self, X):
        '''Predict new data with the trained coefficients and bias'''

        # Check if the X data is 1D and reshape if needed
        if X.ndim == 1:
                    X = X.reshape(-1, 1) 

        # Make predictions by dotting the new data with the coefficients and adding the bias
        self.predictions = X.dot(self.coef) + self.bias
        
        return self.predictions


In [27]:
ols = ols_regression()

plot_3d_regression(ols, model_name='OLS', data_type='ugly')


## Gradient Descent Regression

In [25]:
class GDRegression():
    def __init__(self, epochs, eta):
        '''Initialize the Gradient Descent Regression Class'''
        self.epochs = epochs
        self.eta = eta

    def fit(self, X, y, batch_size = "TBD"):
        '''Train the Gradient Descent Regression Class'''

        if batch_size == 'TBD':
            batch_size = X.shape[0]


        # Create random initialization for the bias and coefficients
        bias = np.random.random()
        coef = np.random.random(X.shape[1])

        # Iterate through each epoch
        for iter in range(self.epochs):
            
            indices = np.random.choice(X.shape[0], size=min(batch_size, len(y)), replace=False)
            X_batch = X[indices]
            y_batch = y[indices]

            # Make predictions for the X data being trained on
            y_hat = X_batch.dot(coef) + bias

            # Calculate the derrivative WRT bias and coef given the predicions
            derr_b = 2/X_batch.shape[0] * sum((y_hat - y_batch))
            derr_c = 2/X_batch.shape[0] * X_batch.T.dot(y_hat - y_batch)

            # Update the bias and the coef based on the derrivative
            bias = bias - derr_b * self.eta
            coef = coef - derr_c * self.eta

        # Finalize the bias and coef
        self.bias = bias
        self.coef = coef

    def predict(self, X):
        '''Predict new data given the learned bias and coef'''
        predictions = X.dot(self.coef) + self.bias
        return predictions

        

In [14]:
gd_reg = GDRegression(epochs=10000, eta=.01)
plot_3d_regression(gd_reg, 'Gradient Descent', data_type='ugly')

## KNN Regression

In [15]:
class KNNRegressor():
    def __init__(self, n_neighbors=5):
        '''Initialize the regressor with a defined number of nearest neighbors'''
        self.n_neighbors = n_neighbors

    def fit(self, X, y):
        '''Train the regressor by loading in all X and y data'''
        self.X = X
        self.y = y

    def predict(self, X):
        '''Make predictions based on the training data using euclidian distance'''
        predictions = np.empty(0)

        # For each test point...
        for test_point in X:
            # Calculate the distance between the test point and all training points
            distances = np.linalg.norm(self.X - test_point, axis=1)

            # Find the n_neighbors closest points
            closest_points_indices = np.argsort(distances)[:self.n_neighbors]

            # Use the mean of the closest points to formulate a predictions and append to the predictions array
            prediction = mean(self.y[closest_points_indices])
            predictions = np.append(predictions, prediction)

        return predictions

In [30]:
knn_regressor = KNNRegressor()

plot_3d_regression(knn_regressor, "K-Nearest Neighbors Regression", data_type='ugly')

In [17]:
class DecisionTreeRegressor():
    def __init__(self, max_depth=None):
        self.max_depth = max_depth

    # Function for calculating the MSE of a split
    def mse(self, y):
        return np.mean((y - np.mean(y)) ** 2)
    
    # Function to find the best split at any given point (based on MSE)
    def _best_split(self, X, y):
        self.best_mse = float('inf')
        self.best_feature = None
        self.best_split_value = None
        self.best_left_y = None
        self.best_right_y = None

        for feature_num in range(X.shape[1]):
            feature_values = np.unique(X[:, feature_num])
            for value in feature_values:
                left_index = X[:, feature_num] <= value
                right_index = X[:, feature_num] > value

                left_targets = y[left_index]
                right_targets = y[right_index]

                if len(left_targets) > 0 and len(right_targets) > 0:
                    left_mse = self.mse(left_targets)
                    right_mse = self.mse(right_targets)
                    total_average_mse = left_mse * len(left_targets)/len(y) + right_mse * len(right_targets)/len(y)

                    if total_average_mse < self.best_mse:
                        self.best_mse = total_average_mse
                        self.best_feature = feature_num
                        self.best_split_value = value
                        self.left_y = left_targets
                        self.right_y = right_targets

        return self.best_split_value, self.best_feature, self.left_y, self.right_y
    
    def _build_tree(self, X, y, depth=0):
        if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
            return np.mean(y)
        
        best_split_value, best_feature, left_y, right_y = self._best_split(X, y)

        if best_feature == None:
            return np.mean(y)
        
        left_index = X[:, best_feature] <= best_split_value
        right_index = X[:, best_feature] > best_split_value

        left_tree = self._build_tree(X[left_index], left_y)
        right_tree = self._build_tree(X[right_index], right_y)

        return {
            'feature': best_feature,
            'value': best_split_value,
            'left': left_tree,
            'right': right_tree
        }
    
    def _single_prediction(self, tree, x):
        if isinstance(tree, dict):
            if x[tree['feature']] < tree['value']:
                return self._single_prediction(tree['left'], x)
            else:
                return self._single_prediction(tree['right'], x)
        else:
            return tree
        
    def predict(self, X):
        predictions = np.array([self._single_prediction(self.tree, x) for x in X])
        return predictions

    def fit(self, X, y):
        self.tree = self._build_tree(X, y)

In [18]:
dt_reg = DecisionTreeRegressor()
plot_3d_regression(dt_reg, "Decision Tree", data_type='ugly')

## Random Forest Regression

In [22]:
class RandomForestRegressor():
    def __init__(self, n_estimators, max_depth = None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth

    # Function for calculating the MSE of a split
    def mse(self, y):
        return np.mean((y - np.mean(y)) ** 2)
    
    # Function to find the best split at any given point (based on MSE)
    def _best_split(self, X, y):
        self.best_mse = float('inf')
        self.best_feature = None
        self.best_split_value = None
        self.best_left_y = None
        self.best_right_y = None

        # Randomly sample 1/3 of the total features
        n_features_to_consider = max(1, X.shape[1] // 3)
        random_feature_indices = np.random.choice(X.shape[1], size=n_features_to_consider, replace=False)

        # Only consider these sampled features for splitting
        feature_subset = X[:, random_feature_indices]

        for i, feature_num in enumerate(random_feature_indices):  # Map back to original feature indices
            feature_values = np.unique(feature_subset[:, i])
            for value in feature_values:
                # Boolean indices based on the feature subset
                left_index = feature_subset[:, i] <= value
                right_index = feature_subset[:, i] > value

                # Subset targets using these indices
                left_targets = y[left_index]
                right_targets = y[right_index]

                # Ensure both sides have samples
                if len(left_targets) > 0 and len(right_targets) > 0:
                    left_mse = self.mse(left_targets)
                    right_mse = self.mse(right_targets)
                    total_average_mse = left_mse * len(left_targets) / len(y) + right_mse * len(right_targets) / len(y)

                    if total_average_mse < self.best_mse:
                        self.best_mse = total_average_mse
                        self.best_feature = feature_num
                        self.best_split_value = value
                        self.left_y = left_targets
                        self.right_y = right_targets

        return self.best_split_value, self.best_feature, self.left_y, self.right_y

    
    def _build_tree(self, X, y, depth=0):
        if len(np.unique(y)) == 1 or (self.max_depth is not None and depth >= self.max_depth):
            return np.mean(y)
        
        best_split_value, best_feature, left_y, right_y = self._best_split(X, y)

        if best_feature == None:
            return np.mean(y)
        
        left_index = X[:, best_feature] <= best_split_value
        right_index = X[:, best_feature] > best_split_value

        left_tree = self._build_tree(X[left_index], left_y)
        right_tree = self._build_tree(X[right_index], right_y)

        return {
            'feature': best_feature,
            'value': best_split_value,
            'left': left_tree,
            'right': right_tree
        }

    def _single_prediction(self, tree, x):
        if isinstance(tree, dict):
            if x[tree['feature']] < tree['value']:
                return self._single_prediction(tree['left'], x)
            else:
                return self._single_prediction(tree['right'], x)
        else:
            return tree
        
    def predict(self, X):
        predictions = []
        for tree in self.trees:
            model_predictions = np.array([self._single_prediction(tree, x) for x in X])
            predictions.append(model_predictions)
    

        # Combine all the predictions of all the models and average across the models
        all_predictions = np.column_stack(predictions)
        all_predictions = np.mean(all_predictions, axis=1)

        return all_predictions
        

    def fit(self, X, y):
        models = []
        # Create n decision trees, trained with the feature selection and bootstrapping
        for n in range(self.n_estimators):
            random_row_indices = np.random.choice(len(X), len(X), replace=True)
            subset_X = X[random_row_indices]
            subset_y = y[random_row_indices]
            tree = self._build_tree(subset_X, subset_y)

            # Add each model to storage
            models.append(tree)

        # Save all the trained mini-trees
        self.trees = models
        

In [32]:
rf_regressor = RandomForestRegressor(10)
plot_3d_regression(rf_regressor, "Random Forest Regressor", "ugly")

## Gradient Boosting Regression

In [None]:
class GradientBoostRegressor():
    '''Class for Gradient Boosting Regression. This is a simplification, which utilizes ScikitLearn's DT class without
       chanaging the default hyperparameters, and also does not include early stopping, instead using all n_estimators'''
    def __init__(self, n_estimators = 100, models=None, learning_rate=0.1):
        self.n_estimators = n_estimators
        self.models = models
        self.learning_rate = learning_rate

    def fit(self, X, y):
        initial_predictions = np.full(y.shape, y.mean)
        self.current_gradient = -(y - initial_predictions)

        # Run through the weak learners, training on the residuals and updating
        for i in range(self.n_estimators):
            weak_learner = DecisionTreeRegressor()
            weak_residual_predictions = weak_learner.fit(X, self.current_errors).predict(X)


        