In [1]:
import pandas as pd
import numpy as np
import seaborn as sns 
import matplotlib.pyplot as plt 
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import r2_score

In [3]:
data = pd.read_csv("Data/insurance.csv")
data.head(3)

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,female,27.9,0,yes,southwest,16884.924
1,18,male,33.77,1,no,southeast,1725.5523
2,28,male,33.0,3,no,southeast,4449.462


In [4]:
X_raw = data.drop(columns = ["charges"])
y = np.asarray(data["charges"])

In [6]:
pre = ColumnTransformer([
    ("num", Pipeline([
        ("imp", SimpleImputer(strategy = "median")),
        ("scaler", StandardScaler())
    ]), X_raw.select_dtypes(include = "number").columns),
    ("cat", Pipeline([
        ("imp", SimpleImputer(strategy = "most_frequent")),
        ("ohe", OneHotEncoder(handle_unknown = "ignore", sparse_output= False))
    ]), X_raw.select_dtypes(exclude = "number").columns)
])

In [7]:
X = pre.fit_transform(X_raw)

if type(X) != np.ndarray:
    X = X.toarray()

In [8]:
class LinearRegression2:
    def __init__(self, iterations : int = 100, learning_rate : float = 0.001, gradient_descent : str = 'batch'):
        self.iterations = iterations
        self.learning_rate = learning_rate
        self.gradient_descent = gradient_descent
        self.betas = None
        self.intercept = 0.0
        self.train_loss = []
        self.val_score = []
        self.all_train_loss = []

    def initialize(self, n_features):
        self.betas = np.zeros(n_features, dtype = float)
        self.intercept = 0

    def compute_gradients_update_weights(self, x : np.array, y : np.array):
        x = np.asarray(x, dtype = float)
        y = np.asarray(y, dtype = float)

        if self.gradient_descent == 'batch':
            y_pred = x @ self.betas + self.intercept

            der_loss_wrt_preds = 2 * (y_pred - y)
            #der_preds_wrt_weights = x
            betas_gradients = (x.T @ der_loss_wrt_preds) / x.shape[0]
            intercept_gradient = np.mean(der_loss_wrt_preds)

            self.betas -= self.learning_rate * betas_gradients
            self.intercept -= self.learning_rate * intercept_gradient

        if self.gradient_descent == 'stochastic':
            for row in range(x.shape[0]):
                y_pred = x[row] @ self.betas + self.intercept 

                der_loss_wrt_pred = 2 * (y_pred - y[row])
                #der_preds_wrt_weights = x[row]

                betas_gradients = der_loss_wrt_pred * x[row]
                intercept_gradient = der_loss_wrt_pred

                self.betas -= self.learning_rate * betas_gradients
                self.intercept -= self.learning_rate * intercept_gradient

        return self
    
    def predict(self, x : np.array):
        x = np.asarray(x, dtype = float)
        y_pred = x @ self.betas + self.intercept

        return y_pred
    
    def track_loss(self, x : np.array, y : np.array):
        x = np.asarray(x, dtype = float)
        y = np.asarray(y, dtype = float)

        y_pred = self.predict(x)
        loss = np.sum((y - y_pred)**2) / x.shape[0]

        return loss
    
    
    def fit(self, x_train : np.array, y_train : np.array):
        x_train = np.asarray(x_train, dtype = float)
        y_train = np.asarray(y_train, dtype = float)

        if x_train.shape[0] != len(y_train):
            raise ValueError("x_train and y_train must have the same amount of rows")

        self.initialize(x_train.shape[1])

        self.train_loss = []

        for iteration in range(self.iterations):
            self.compute_gradients_update_weights(x_train, y_train)
            trainloss = self.track_loss(x_train, y_train)
            self.train_loss.append(trainloss)

        return self.train_loss
    
    def cross_validate(self, x, y):
        x = np.asarray(x, dtype = float)
        y = np.asarray(y, dtype = float)

        cv = KFold(n_splits = 5, shuffle = True, random_state = 42)

        for train_idx, val_idx in cv.split(x, y):
            x_train = X[train_idx]
            y_train = y[train_idx]
            x_val = X[val_idx]
            y_val = y[val_idx]

            model = LinearRegression2(
                learning_rate = self.learning_rate,
                iterations = self.iterations,
                gradient_descent = self.gradient_descent
            )

            train_loss = model.fit(x_train, y_train)
            self.all_train_loss.append(train_loss)
            y_val_pred = model.predict(x_val)

            val_eval = r2_score(y_val, y_val_pred)

            self.val_score.append(val_eval)

        return self.val_score


In [20]:
instance = LinearRegression2(learning_rate = 0.01, iterations = 1000, gradient_descent = "batch")

In [36]:
n_splits = 5
for fold in range(1, n_splits + 1):
    fold_avg_loss = np.mean(instance.all_train_loss[fold - 1])
    val_score = np.round(instance.cross_validate(X, y), 2)
    print(f"Fold {fold} avg loss: {fold_avg_loss}, r2 score: {val_score[fold - 1]}")

Fold 1 avg loss: 49084284.65365079, r2 score: 0.78
Fold 2 avg loss: 48160136.83907313, r2 score: 0.74
Fold 3 avg loss: 49144419.08123899, r2 score: 0.8
Fold 4 avg loss: 47906243.80218184, r2 score: 0.63
Fold 5 avg loss: 47684770.815754, r2 score: 0.75
