In [1]:
import pandas as pd
import numpy as np
from pathlib import Path

Created ClaimFrequancy to get the expected number of claims per year

In [2]:
df = pd.read_csv('claims_train.csv')
df['ClaimFrequency'] = df['ClaimNb'] / df['Exposure']
df = pd.read_csv('claims_train.csv')
df['ClaimFrequency'] = df['ClaimNb'] / df['Exposure']


Remove outliers. Without it we would have ClaimFrequancy values of 732

In [3]:
df=df[df['Exposure'] >= 0.01]

Added a cap to ClaimFrequancy so we can avoid values of 366 ClaimFrequancies in a year

In [4]:
cap = df['ClaimFrequency'].quantile(0.995)
df['ClaimFrequency'] = df['ClaimFrequency'].clip(upper=cap)
df.head()

Unnamed: 0,IDpol,ClaimNb,Exposure,Area,VehPower,VehAge,DrivAge,BonusMalus,VehBrand,VehGas,Density,Region,ClaimFrequency
0,2122523.0,0,0.43,D,7,18,36,95,B1,Regular,1054,R24,0.0
1,3173420.0,0,0.1,D,7,17,80,95,B2,Regular,598,R25,0.0
2,1188619.0,0,0.33,E,7,3,36,76,B6,Regular,4172,R82,0.0
3,31400.0,0,0.56,A,5,4,73,52,B13,Diesel,15,R24,0.0
4,3138755.0,0,0.27,E,8,0,37,50,B11,Diesel,3021,R53,0.0


In [5]:
num_cols = ['VehPower', 'VehAge', 'DrivAge', 'BonusMalus', 'Density']
cat_cols = ['Area']

df_dummies = pd.get_dummies(df[cat_cols], drop_first=False)

X_train = pd.concat([df[num_cols], df_dummies], axis=1).values
y_train = df['ClaimFrequency'].values


class decisionTree:
    def __init__(self, min_samples_leaf=100, min_samples_split=20, max_depth=5):
        self.max_depth = max_depth
        self.min_samples_leaf = min_samples_leaf
        self.min_samples_split = min_samples_split
        self.tree = None
    
    def fit(self, X, y):
        X = np.asarray(X)
        y = np.asarray(y)
        self.tree = self.buildTree(X, y, 0)

    def predict(self, X):
        X = np.asarray(X)
        return np.array([self.predict_row(row, self.tree) for row in X])

    def mse(self, y):
        return ((y-y.mean())**2).mean()

    def buildTree(self, X, y, depth):
        n_samples, n_features = X.shape
        node = {}
        node["value"] = float(y.mean())

        best_loss = np.inf
        best_thresh = None
        best_feat = None

        if (depth >= self.max_depth or
            n_samples < self.min_samples_split or
            np.unique(y).size == 1):
            node['is_leaf'] = True
            return node

        for feat_idx in range(n_features):
            values = X[:, feat_idx]
            unique_vals = np.unique(values)
            if unique_vals.size == 1:
                continue
            trashholds = (unique_vals[:-1] + unique_vals[1:]) / 2
            for t in trashholds:
                left_mask = values <= t
                right_mask = ~left_mask

                n_left = left_mask.sum()
                n_right = right_mask.sum()

                if n_left < self.min_samples_leaf or n_right < self.min_samples_leaf:
                    continue
                
                y_left = y[left_mask]
                y_right = y[right_mask]

                mse_left = self.mse(y_left)
                mse_right = self.mse(y_right)

                loss = (n_left * mse_left + n_right * mse_right) / (n_samples)

                if loss < best_loss:
                    best_loss = loss
                    best_thresh = t
                    best_feat = feat_idx
        
        if best_feat is None:
            node['is_leaf'] = True
            return node

        node['is_leaf'] = False
        node['feature_index'] = best_feat
        node['threshold'] = best_thresh

        values = X[:, best_feat]
        left_mask = values <= best_thresh
        right_mask = ~left_mask

        node['left'] = self.buildTree(X[left_mask], y[left_mask], depth + 1)
        node['right'] = self.buildTree(X[right_mask], y[right_mask], depth + 1)

        return node

    def predict_row(self, row, node):
        while not node['is_leaf']:
            feat_idx = node['feature_index']
            thresh = node['threshold']
            if row[feat_idx] <= thresh:
                node = node['left']
            else:
                node = node['right']
        return node['value']


def rmse(y_true, y_pred):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    return np.sqrt(((y_true - y_pred) ** 2).mean())

tree = decisionTree(
    max_depth=12,
    min_samples_split=200,
    min_samples_leaf=10,
)

tree.fit(X_train, y_train)

y_train_pred = tree.predict(X_train)

print('Training RMSE:', rmse(y_train, y_train_pred))


Training RMSE: 0.819263848833977


RMSE by area

In [6]:
y_pred = tree.predict(X_train)

def rmse(y_true, y_pred):
    y_true = np.asarray(y_true)
    y_pred = np.asarray(y_pred)
    return np.sqrt(((y_true - y_pred) ** 2).mean())

areas = df['Area'].unique()

print("RMSE by Area:")
for a in sorted(areas):
    mask = (df['Area'] == a)
    area_rmse = rmse(y_train[mask], y_pred[mask])
    print(f"Area {a}: {area_rmse:.4f}")

RMSE by Area:
Area A: 0.7203
Area B: 0.7639
Area C: 0.7837
Area D: 0.8534
Area E: 0.8900
Area F: 1.0606


RMSE by density quartile

In [7]:
df['DensityQuartile'] = pd.qcut(df['Density'], 4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
print("\nRMSE by Density Quartile:")
for q in ['Q1', 'Q2', 'Q3', 'Q4']:
    mask = (df['DensityQuartile'] == q)
    quartile_rmse = rmse(y_train[mask], y_pred[mask])
    print(f"{q}: {quartile_rmse:.4f}")




RMSE by Density Quartile:
Q1: 0.7370
Q2: 0.7871
Q3: 0.8244
Q4: 0.9184


M2

In [11]:


ROOT = Path(".")  # adjust if needed
train_path = ROOT / "claims_train.csv"
test_path  = ROOT / "claims_test.csv"

df_train = pd.read_csv(train_path)
df_test  = pd.read_csv(test_path)

# Target variable: ClaimFrequency = ClaimNb / Exposure
for df in [df_train, df_test]:
    df["ClaimFrequency"] = df["ClaimNb"] / df["Exposure"]

# Optional: filter tiny exposures, cap extreme frequencies as you already did
min_exposure = 1e-3
for name in ["train", "test"]:
    if name == "train":
        df_train = df_train[df_train["Exposure"] > min_exposure].copy()
    else:
        df_test = df_test[df_test["Exposure"] > min_exposure].copy()

q = df_train["ClaimFrequency"].quantile(0.995)
df_train["ClaimFrequency"] = df_train["ClaimFrequency"].clip(upper=q)
df_test["ClaimFrequency"]  = df_test["ClaimFrequency"].clip(upper=q)

# ---- Choose features ----
num_cols = ["VehPower", "VehAge", "DrivAge", "BonusMalus", "Density", "Exposure"]
cat_cols = ["Area"]   # keep it simple, but you can add more later

# One-hot encode categorical features
train_dummies = pd.get_dummies(df_train[cat_cols], drop_first=False)
test_dummies  = pd.get_dummies(df_test[cat_cols],  drop_first=False)

# Align columns so train & test match exactly
train_dummies, test_dummies = train_dummies.align(test_dummies, join="left", axis=1, fill_value=0)

X_train_df = pd.concat([df_train[num_cols], train_dummies], axis=1)
X_test_df  = pd.concat([df_test[num_cols],  test_dummies],  axis=1)

y_train = df_train["ClaimFrequency"].values.astype(np.float64)
y_test  = df_test["ClaimFrequency"].values.astype(np.float64)

X_train = X_train_df.values.astype(np.float64)
X_test  = X_test_df.values.astype(np.float64)

# Standardize features (very important for NN)
X_mean = X_train.mean(axis=0, keepdims=True)
X_std  = X_train.std(axis=0, keepdims=True) + 1e-8

X_train_std = (X_train - X_mean) / X_std
X_test_std  = (X_test  - X_mean) / X_std

X_train_std.shape, X_test_std.shape


((542410, 12), (135603, 12))

In [None]:
import numpy as np

class FFNNRegressor:
    def __init__(
        self,
        input_dim,
        hidden_dim=32,
        lr=0.01,
        epochs=80,
        batch_size=2048,
        l2=0.0,
        random_state=0,
        verbose=True,
    ):
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.lr = lr
        self.epochs = epochs
        self.batch_size = batch_size
        self.l2 = l2
        self.verbose = verbose

        rng = np.random.RandomState(random_state)

        self.W1 = rng.randn(input_dim, hidden_dim) / np.sqrt(input_dim)
        self.b1 = np.zeros((1, hidden_dim))

        self.W2 = rng.randn(hidden_dim, 1) / np.sqrt(hidden_dim)
        self.b2 = np.zeros((1, 1))

    @staticmethod
    def relu(z):
        return np.maximum(0.0, z)

    @staticmethod
    def relu_deriv(z):
        return (z > 0.0).astype(np.float64)

    @staticmethod
    def mse(y_true, y_pred):
        y_true = y_true.reshape(-1, 1)
        y_pred = y_pred.reshape(-1, 1)
        return np.mean((y_true - y_pred) ** 2)

    @staticmethod
    def rmse(y_true, y_pred):
        return np.sqrt(FFNNRegressor.mse(y_true, y_pred))

    @staticmethod
    def r2_score(y_true, y_pred):
        y_true = y_true.reshape(-1)
        y_pred = y_pred.reshape(-1)
        ss_res = np.sum((y_true - y_pred) ** 2)
        ss_tot = np.sum((y_true - y_true.mean()) ** 2)
        return 1.0 - ss_res / ss_tot

    def _forward(self, X):
        """
        X: (batch_size, input_dim)
        Returns (z1, a1, z2, y_pred)
        """
        z1 = X @ self.W1 + self.b1
        a1 = self.relu(z1)
        z2 = a1 @ self.W2 + self.b2
        y_pred = z2
        return z1, a1, z2, y_pred

    def fit(self, X, y):
        """
        X: (N, D), y: (N,)
        """
        X = np.asarray(X, dtype=np.float64)
        y = np.asarray(y, dtype=np.float64).reshape(-1, 1)
        N, D = X.shape
        assert D == self.input_dim, "input_dim mismatch"

        for epoch in range(self.epochs):
            idx = np.random.permutation(N)
            X_shuffled = X[idx]
            y_shuffled = y[idx]

            for start in range(0, N, self.batch_size):
                end = start + self.batch_size
                X_batch = X_shuffled[start:end]
                y_batch = y_shuffled[start:end]
                if X_batch.shape[0] == 0:
                    continue

                z1, a1, z2, y_pred = self._forward(X_batch)

                batch_size = X_batch.shape[0]
                dL_dy = 2.0 * (y_pred - y_batch) / batch_size 

                dL_dW2 = a1.T @ dL_dy
                dL_db2 = np.sum(dL_dy, axis=0, keepdims=True) 

                if self.l2 > 0.0:
                    dL_dW2 += 2.0 * self.l2 * self.W2

                dL_da1 = dL_dy @ self.W2.T
                dL_dz1 = dL_da1 * self.relu_deriv(z1)

                dL_dW1 = X_batch.T @ dL_dz1
                dL_db1 = np.sum(dL_dz1, axis=0, keepdims=True)

                if self.l2 > 0.0:
                    dL_dW1 += 2.0 * self.l2 * self.W1

                self.W2 -= self.lr * dL_dW2
                self.b2 -= self.lr * dL_db2
                self.W1 -= self.lr * dL_dW1
                self.b1 -= self.lr * dL_db1

            if self.verbose and (epoch % 10 == 0 or epoch == self.epochs - 1):
                _, _, _, y_pred_full = self._forward(X)
                loss = self.mse(y, y_pred_full)
                rmse_val = self.rmse(y, y_pred_full)
                print(f"Epoch {epoch+1:3d}/{self.epochs} - MSE: {loss:.6f} - RMSE: {rmse_val:.4f}")

    def predict(self, X):
        X = np.asarray(X, dtype=np.float64)
        _, _, _, y_pred = self._forward(X)
        return y_pred.ravel()


In [None]:
input_dim = X_train_std.shape[1]

nn = FFNNRegressor(
    input_dim=input_dim,
    hidden_dim=32,
    lr=0.01,
    epochs=80,
    batch_size=2048,
    l2=1e-4,
    random_state=42,
    verbose=True
)

nn.fit(X_train_std, y_train)

y_pred_train = nn.predict(X_train_std)
y_pred_test  = nn.predict(X_test_std)

print("\nScratch NN (M2) performance:")
print(f"Train RMSE: {nn.rmse(y_train, y_pred_train):.4f}")
print(f"Test  RMSE: {nn.rmse(y_test,  y_pred_test):.4f}")
print(f"Train R²:   {nn.r2_score(y_train, y_pred_train):.4f}")
print(f"Test  R²:   {nn.r2_score(y_test,  y_pred_test):.4f}")


Epoch   1/80 - MSE: 0.804691 - RMSE: 0.8970
Epoch  11/80 - MSE: 0.789238 - RMSE: 0.8884
Epoch  21/80 - MSE: 0.788333 - RMSE: 0.8879
Epoch  31/80 - MSE: 0.787965 - RMSE: 0.8877
Epoch  41/80 - MSE: 0.787829 - RMSE: 0.8876
Epoch  51/80 - MSE: 0.787584 - RMSE: 0.8875
Epoch  61/80 - MSE: 0.787412 - RMSE: 0.8874
Epoch  71/80 - MSE: 0.787209 - RMSE: 0.8872
Epoch  80/80 - MSE: 0.787139 - RMSE: 0.8872

Scratch NN (M2) performance:
Train RMSE: 0.8872
Test  RMSE: 0.8881
Train R²:   0.0107
Test  R²:   0.0104
