# Insurance Cost Regression

Этот ноутбук реализует весь необходимый пайплайн, разбитый на понятные ячейки: подготовка данных, две реализации линейной регрессии (аналитическая и градиентный спуск), добавление регуляризации (Ridge) аналитически и численно, и сравнение моделей по MSE на тесте. Все регрессии реализованы через классы для удобного переиспользования.

**Файлы (ожидаются по путям):** `../datasets/insurance_train.csv`, `../datasets/insurance_test.csv`

In [None]:
# Блок 1: импорты
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import Optional, Tuple
from dataclasses import dataclass
from copy import deepcopy

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

np.random.seed(42)


In [None]:
# Блок 2: загрузка данных
train_path = '../datasets/insurance_train.csv'
test_path  = '../datasets/insurance_test.csv'

try:
    df_train = pd.read_csv(train_path)
    df_test  = pd.read_csv(test_path)
    print('Train shape:', df_train.shape)
    print('Test shape :', df_test.shape)
except Exception as e:
    print('Не удалось загрузить файлы по ожидаемым путям. Ошибка:', e)
    print('Проверьте, что файлы существуют и пути правильные.')

## 1. Подготовка данных
1. Проверка пропусков и выбросов
2. Приведение категориальных признаков к числовым
3. Парные корреляции признаков

In [None]:
# Блок 3: Простая проверка пропусков и первичная обработка
def basic_eda(df: pd.DataFrame, name: str='data'):
    print(f'== {name} ==')
    print('shape:', df.shape)
    print('\nПропуски по столбцам:')
    print(df.isnull().sum())
    print('\nОписательная статистика (числовые):')
    display(df.describe(include='all'))

basic_eda(df_train, 'train')
basic_eda(df_test, 'test')

In [None]:
# Блок 4: Обработаем категориальные признаки и проверим выбросы
def preprocess_basic(df: pd.DataFrame, fit_scaler: Optional[StandardScaler]=None):
    df = df.copy()
    # Бинарные маппинги
    if 'sex' in df.columns:
        df['sex'] = df['sex'].map({'male':0, 'female':1})
    if 'smoker' in df.columns:
        df['smoker'] = df['smoker'].map({'no':0, 'yes':1})
    # region -> one-hot если есть
    if 'region' in df.columns:
        region_dummies = pd.get_dummies(df['region'], prefix='region')
        df = pd.concat([df.drop(columns=['region']), region_dummies], axis=1)
    # X, y
    if 'charges' in df.columns:
        y = df['charges'].values.reshape(-1,1)
        X = df.drop(columns=['charges']).values.astype(float)
    else:
        y = None
        X = df.values.astype(float)
    if fit_scaler is None:
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        return X_scaled, y, scaler, df
    else:
        X_scaled = fit_scaler.transform(X)
        return X_scaled, y, fit_scaler, df

X_train_scaled, y_train, scaler, df_train_proc = preprocess_basic(df_train)
X_test_scaled, y_test, _, df_test_proc = preprocess_basic(df_test, fit_scaler=scaler)

print('Processed train shape:', X_train_scaled.shape)
print('Processed test shape :', X_test_scaled.shape)

In [None]:
# Блок 5: Проверка выбросов (IQR) и корреляции
def detect_outliers_iqr(df, col):
    q1 = df[col].quantile(0.25)
    q3 = df[col].quantile(0.75)
    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr
    return df[(df[col] < lower) | (df[col] > upper)][col]

numeric_cols = df_train.select_dtypes(include=[np.number]).columns.tolist()
print('Числовые столбцы:', numeric_cols)

for col in numeric_cols:
    out = detect_outliers_iqr(df_train, col)
    print(f'{col}: {len(out)} выбросов (IQR)')

corr = pd.DataFrame(df_train_proc.select_dtypes(include=[np.number])).corr()
if 'charges' in df_train.columns:
    corr_with_target = corr['charges'].abs().sort_values(ascending=False)
    print('\nТоп по корреляции с charges:')
    display(corr_with_target.head(10))
print('\nЧасть матрицы корреляций:')
display(corr.iloc[:8, :8])

## 2. Многомерная линейная регрессия
Реализуем классы регрессоров: аналитический и через градиентный спуск.

In [None]:
# Блок 6: Базовый регрессор
class BaseLinearRegressor:
    def __init__(self):
        self.w = None
        self.fitted = False

    def _add_bias(self, X):
        return np.hstack([np.ones((X.shape[0], 1)), X])

    def predict(self, X):
        Xb = self._add_bias(X)
        return Xb.dot(self.w).reshape(-1,1)

In [None]:
# Блок 7: Аналитическая лин. регрессия
class LinearAnalytic(BaseLinearRegressor):
    def fit(self, X, y):
        Xb = self._add_bias(X)
        XtX = Xb.T.dot(Xb)
        Xty = Xb.T.dot(y)
        try:
            w = np.linalg.solve(XtX, Xty)
        except np.linalg.LinAlgError:
            w = np.linalg.pinv(XtX).dot(Xty)
        self.w = w.reshape(-1,1)
        self.fitted = True
        return self

In [None]:
# Блок 8: Линейная регрессия (GD)
class LinearGD(BaseLinearRegressor):
    def __init__(self, lr=1e-2, n_iter=1000, tol=1e-6, verbose=False):
        super().__init__()
        self.lr = lr
        self.n_iter = n_iter
        self.tol = tol
        self.verbose = verbose
        self.loss_history = []

    def fit(self, X, y):
        Xb = self._add_bias(X)
        n, d = Xb.shape
        w = np.zeros((d,1))
        prev_loss = np.inf
        for it in range(self.n_iter):
            preds = Xb.dot(w)
            err = preds - y
            loss = (err**2).mean()
            grad = 2.0 * Xb.T.dot(err) / n
            w = w - self.lr * grad
            self.loss_history.append(loss)
            if self.verbose and it % (self.n_iter//10 + 1) == 0:
                print(f'it {it}, loss {loss:.6f}')
            if abs(prev_loss - loss) < self.tol:
                break
            prev_loss = loss
        self.w = w
        self.fitted = True
        return self

## 3. Добавление регуляризации (Ridge) — аналитически и через GD

In [None]:
# Блок 9: Ridge аналитически
class RidgeAnalytic(BaseLinearRegressor):
    def __init__(self, lam=1.0):
        super().__init__()
        self.lam = lam

    def fit(self, X, y):
        Xb = self._add_bias(X)
        n, d = Xb.shape
        L = np.eye(d)
        L[0,0] = 0.0
        XtX = Xb.T.dot(Xb)
        A = XtX + self.lam * L
        Xty = Xb.T.dot(y)
        try:
            w = np.linalg.solve(A, Xty)
        except np.linalg.LinAlgError:
            w = np.linalg.pinv(A).dot(Xty)
        self.w = w.reshape(-1,1)
        self.fitted = True
        return self

In [None]:
# Блок 10: Ridge через GD
class RidgeGD(BaseLinearRegressor):
    def __init__(self, lam=1.0, lr=1e-2, n_iter=1000, tol=1e-6, verbose=False):
        super().__init__()
        self.lam = lam
        self.lr = lr
        self.n_iter = n_iter
        self.tol = tol
        self.verbose = verbose
        self.loss_history = []

    def fit(self, X, y):
        Xb = self._add_bias(X)
        n, d = Xb.shape
        w = np.zeros((d,1))
        prev_loss = np.inf
        for it in range(self.n_iter):
            preds = Xb.dot(w)
            err = preds - y
            mse = (err**2).mean()
            penalty = (self.lam / n) * (w[1:]**2).sum()
            loss = mse + penalty
            grad = 2.0 * Xb.T.dot(err) / n
            grad[1:] += 2.0 * (self.lam / n) * w[1:]
            w = w - self.lr * grad
            self.loss_history.append(loss)
            if self.verbose and it % (self.n_iter//10 + 1) == 0:
                print(f'it {it}, loss {loss:.6f}')
            if abs(prev_loss - loss) < self.tol:
                break
            prev_loss = loss
        self.w = w
        self.fitted = True
        return self

## 4. Оценка обобщающей способности
Сравним: константную модель, аналитическую, GD и Ridge.

In [None]:
# Блок 11: Константная модель
class ConstantPredictor:
    def __init__(self):
        self.mu = None
    def fit(self, y):
        self.mu = y.mean()
        return self
    def predict(self, X):
        return np.ones((X.shape[0],1)) * self.mu

const = ConstantPredictor().fit(y_train)
pred_const = const.predict(X_test_scaled)
mse_const = mean_squared_error(y_test, pred_const)
print('MSE constant:', mse_const)

In [None]:
# Блок 12: Обучение всех моделей и сравнение
models = {
    'LinearAnalytic': LinearAnalytic().fit(X_train_scaled, y_train),
    'LinearGD': LinearGD(lr=0.1, n_iter=5000, tol=1e-7).fit(X_train_scaled, y_train),
    'RidgeAnalytic (lam=1.0)': RidgeAnalytic(lam=1.0).fit(X_train_scaled, y_train),
    'RidgeGD (lam=1.0)': RidgeGD(lam=1.0, lr=0.1, n_iter=3000).fit(X_train_scaled, y_train)
}

results = {}
for name, model in models.items():
    preds = model.predict(X_test_scaled)
    mse = mean_squared_error(y_test, preds)
    results[name] = mse

results['Constant'] = mse_const

print('\nMSE на тесте:')
for k,v in sorted(results.items(), key=lambda x: x[1]):
    print(f'{k:30s}: {v:.4f}')

In [None]:
# Блок 13: Loss plots (для GD-моделей)
plt.figure(figsize=(8,4))
if hasattr(models['LinearGD'], 'loss_history'):
    plt.plot(models['LinearGD'].loss_history, label='LinearGD')
if hasattr(models['RidgeGD (lam=1.0)'], 'loss_history'):
    plt.plot(models['RidgeGD (lam=1.0)'].loss_history, label='RidgeGD')
plt.yscale('log')
plt.xlabel('iters')
plt.ylabel('loss (log)')
plt.legend()
plt.grid(True)
plt.show()

### Конец

Файл сохранён в `/mnt/data/insurance_regression_notebook.ipynb`. Откройте и запустите ячейки. Если данные по другим путям — исправьте переменные `train_path`/`test_path`. 