In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")

# Read the Data
df = pd.read_csv("/content/insuranceFraud_Dataset.csv", na_values=["?"])

# Split data
# Prevention for data leakage
from sklearn.model_selection import train_test_split

X_train_set, X_test_set = train_test_split(df, test_size=0.25, random_state=42)


# Assign the target column
target_column = "fraud_reported"

# Training set
X_train_df = X_train_set.drop(target_column, axis=1)

# Training target column
y_train_df = X_train_set[target_column]

# Testing set
X_test_df = X_test_set.drop(target_column, axis=1)

# Testing target column
y_test_df = X_test_set[target_column]


In [2]:
# Categorical Columns in Training set
categorical_columns = X_train_df.select_dtypes(include=["object"]).columns

# Numerical columns in Training set
numerical_columns = X_train_df.select_dtypes(exclude=["object"]).columns

# Define ordinal columns
ordinal_columns = [
    "policy_csl",
    "policy_deductable",
    "insured_education_level",
    "incident_severity",
    "number_of_vehicles_involved",
    "bodily_injuries",
    "witnesses",
    "auto_year",
]

# Define categories for ordinal encoding
policy_csl = ["500/1000", "250/500", "100/300"]
policy_deductable = [2000, 1000, 500]
insured_education_level = [
    "PhD",
    "MD",
    "JD",
    "Masters",
    "College",
    "Associate",
    "High School",
]
incident_severity = [
    "Total Loss",
    "Major Damage",
    "Minor Damage",
    "Trivial Damage",
]
number_of_vehicles_involved = [4, 3, 2, 1]
bodily_injuries = [2, 1, 0]
witnesses = [3, 2, 1, 0]
auto_year = [
    2015,
    2014,
    2013,
    2012,
    2011,
    2010,
    2009,
    2008,
    2007,
    2006,
    2005,
    2004,
    2003,
    2002,
    2001,
    2000,
    1999,
    1998,
    1997,
    1996,
    1995,
]

# Define the Nominal columns
nominal_categorical_columns = [
    col
    for col in categorical_columns
    if col not in ordinal_columns and col != "policy_bind_date"
]


In [3]:
# Columns to Drop
columns_to_drop = [
    "months_as_customer",
    "policy_number",
    "insured_zip",
    "insured_hobbies",
    "incident_date",
    "incident_location",
    "total_claim_amount",
    "auto_model",
    "incident_city",
    "umbrella_limit",
]


In [4]:
def fraction_to_float(x):
    # Check if input is a string and contains a slash (indicating a fraction)
    if isinstance(x, str) and "/" in x:
        # Split the string into numerator and denominator, convert both to float
        num, denom = map(float, x.split("/"))
        # Return the result of dividing numerator by denominator
        return num / denom
    # If not a fraction, try to convert directly to float
    try:
        return float(x)
    # If conversion fails (e.g., invalid string or wrong type), return NaN
    except (ValueError, TypeError):
        return np.nan


def extract_year(X):
    X = pd.to_datetime(X.squeeze())
    return X.dt.year.to_frame()


def hour_to_period(X):
    hour = X.squeeze()
    if hasattr(hour, "values"):
        hour = hour.values
    bins = [0, 6, 12, 18, 24]
    labels = ["Night", "Morning", "Afternoon", "Evening"]
    return pd.DataFrame(
        pd.cut(
            hour, bins=bins, labels=labels, right=False, include_lowest=True
        )
    )


In [5]:
from sklearn.preprocessing import (
    FunctionTransformer,
    KBinsDiscretizer,
    StandardScaler,
    OneHotEncoder,
    OrdinalEncoder,
)
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import numpy as np


class KBinsDiscretizerPlusOne(BaseEstimator, TransformerMixin):
    """
    Custom discretizer that adds 1 to KBinsDiscretizer output.
    Converts continuous features into discrete bins with 1-based indexing.
    """

    def __init__(self, n_bins=5, encode="ordinal", strategy="uniform"):
        self.n_bins = n_bins
        self.encode = encode
        self.strategy = strategy
        self.kbd = None

    def fit(self, X, y=None):
        """Fit the discretizer to the data."""
        self.kbd = KBinsDiscretizer(
            n_bins=self.n_bins, encode=self.encode, strategy=self.strategy
        )
        self.kbd.fit(X)
        return self

    def transform(self, X):
        """Transform data by discretizing and adding 1 to each bin."""
        return (self.kbd.transform(X) + 1).astype(int)

    def get_feature_names_out(self, input_features=None):
        """Return feature names for transformed output."""
        if input_features is None:
            return [
                f"feature_{i}_binned" for i in range(self.kbd.n_features_in_)
            ]
        return [f"{feature}_binned" for feature in input_features]


class RandomSampleImputer(BaseEstimator, TransformerMixin):
    """
    Custom imputer that fills missing values by randomly sampling
    from existing non-null values in each column.
    """

    def __init__(self, random_state=None):
        self.random_state = random_state
        self.feature_values_ = {}

    def fit(self, X, y=None):
        """Learn the non-null values for each feature."""
        X = pd.DataFrame(X)
        for col in X.columns:
            non_null = X[col].dropna()
            self.feature_values_[col] = non_null.values
        return self

    def transform(self, X):
        """Fill missing values with random samples from non-null values."""
        X = pd.DataFrame(X).copy()
        rng = np.random.default_rng(self.random_state)

        for col in X.columns:
            mask = X[col].isna()
            if mask.any() and col in self.feature_values_:
                samples = rng.choice(
                    self.feature_values_[col], size=mask.sum()
                )
                X.loc[mask, col] = samples

        return X.values


In [6]:
# Extracts the year from date fields using a custom function.
# Discretizes the extracted years into bins and increments them by 1 (custom transformer).

date_pipeline = Pipeline(steps=[
    ("extract_year", FunctionTransformer(extract_year)),
    ("bin_year", KBinsDiscretizerPlusOne()),
    ("scaler", StandardScaler()),
])

# Fills missing numerical values using multivariate iterative imputation (like chained regression)
# Standardizes numerical values to mean 0 and standard deviation 1.

num_pipeline = Pipeline(
    steps=[
        ("imputer", IterativeImputer(max_iter=100, random_state=42)),
        ("scaler", StandardScaler()),
    ]
)

# Fills missing categorical values by randomly sampling from non-missing values.
# Converts ordered categorical variables into integer-encoded form, preserving order.

ordinal_pipeline = Pipeline(
    steps=[
        ("imputer", RandomSampleImputer()),
        (
            "ordinal_encoder",
            OrdinalEncoder(
                categories=[
                    policy_csl,
                    policy_deductable,
                    insured_education_level,
                    incident_severity,
                    number_of_vehicles_involved,
                    bodily_injuries,
                    witnesses,
                    auto_year,
                ]
            ),
        ),
        ("scaler", StandardScaler()),
    ]
)

# Handles missing nominal (unordered categorical) values via random sampling.
# Applies one-hot encoding (excluding the first category to avoid multicollinearity), while ignoring unknown categories.

nominal_pipeline = Pipeline(
    steps=[
        ("imputer", RandomSampleImputer()),
        (
            "one_hot",
            OneHotEncoder(
                handle_unknown="ignore", drop="first", sparse_output=False
            ),
        ),
    ]
)

hour_pipeline = Pipeline(
    steps=[
        (
            "hour_to_period",
            FunctionTransformer(hour_to_period, validate=False),
        ),
        (
            "one_hot",
            OneHotEncoder(sparse_output=False, handle_unknown="ignore"),
        ),
    ]
)


In [7]:
preprocessor = ColumnTransformer(
    transformers=[
        ("date", date_pipeline, ["policy_bind_date"]),
        ('drop_columns', 'drop', columns_to_drop),
        ("hour", hour_pipeline, ["incident_hour_of_the_day"]),
        ('numerical', num_pipeline, numerical_columns),
        ('ordinal', ordinal_pipeline, ordinal_columns),
        ('nominal', nominal_pipeline, nominal_categorical_columns)
    ],
    remainder='passthrough'
)

In [8]:
pipeline = Pipeline([("preprocessor", preprocessor)])

# Fit and transform
X_train_df_arr = pipeline.fit_transform(X_train_df)
X_test_df_arr = pipeline.transform(X_test_df)


In [9]:
train_arr = np.c_[X_train_df_arr, y_train_df.values.reshape(-1, 1)]
test_arr = np.c_[X_test_df_arr, y_test_df.values.reshape(-1, 1)]

In [10]:
import pickle
import os
def save_pkl(file_path, obj):
  try:
    dir_path = os.path.dirname(file_path)
    os.makedirs(dir_path, exist_ok=True)
    with open(file_path, 'wb') as f:
      pickle.dump(obj, f)
  except Exception as e:
    print(f"Error saving {file_path}: {e}")


In [11]:
save_pkl("/content/preprocessor.pkl", preprocessor)

# Model Trainer

In [12]:
from dataclasses import dataclass
import os
import sys
import logging


from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
import xgboost as xgb
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import BaggingClassifier
from sklearn.linear_model import SGDClassifier

from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

In [13]:
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score

def evaluate_models(X_train, y_train, X_test, y_test, models, param_grids):
    """
    Uses GridSearchCV to perform hyperparameter tuning for each model in a provided dictionary,
    selecting the best parameters and evaluating model performance on both training and test sets.
    It returns a report of each model’s test accuracy after tuning.
    """
    try:
        report = {}
        for model_name in models.keys():

            # Get the model from dictionary
            model_class = models[model_name]

            # Instantiate the model
            model = model_class()
            # Get the parameter grid for this model
            para = param_grids[model_name]
            # Set up GridSearchCV
            gs = GridSearchCV(model, param_grid=para, cv=3)
            gs.fit(X_train, y_train)

            # Get the best model with best parameters
            best_model = gs.best_estimator_
            # Make predictions
            y_test_pred = best_model.predict(X_test)
            # Calculate test accuracy
            test_model_score = accuracy_score(y_test, y_test_pred)

            # Append to report
            report[model_name] = test_model_score

        return report
        print(report)

    except Exception as e:
        raise e


In [14]:
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import (
    RandomForestClassifier,
    GradientBoostingClassifier,
    AdaBoostClassifier,
    BaggingClassifier
)
from sklearn.naive_bayes import GaussianNB, MultinomialNB
import xgboost as xgb
from sklearn.preprocessing import LabelEncoder


def initiate_model_trainer(train_arr, test_arr):
    try:
        # Split Data
        X_train, y_train = train_arr[:, :-1], train_arr[:, -1]
        X_test, y_test = test_arr[:, :-1], test_arr[:, -1]

        # Assuming y_train and y_test are your target arrays
        le = LabelEncoder()
        y_train = le.fit_transform(y_train)
        y_test = le.transform(y_test)

        # Models
        models_dict = {
            "LogisticRegression": LogisticRegression,
            "SVC": SVC,
            "KNeighborsClassifier": KNeighborsClassifier,
            "DecisionTreeClassifier": DecisionTreeClassifier,
            "RandomForestClassifier": RandomForestClassifier,
            "GradientBoostingClassifier": GradientBoostingClassifier,
            "XGBClassifier": xgb.XGBClassifier,
            "GaussianNB": GaussianNB,
            "AdaBoostClassifier": AdaBoostClassifier,
            "BaggingClassifier": BaggingClassifier,
            "SGDClassifier": SGDClassifier
        }

        # Parameter grids (only active parameters shown; you can uncomment others as needed)
        param_grids = {
            "LogisticRegression": {
                "penalty": ["l1", "l2", "elasticnet", None],
                # "C": [0.01, 0.1, 1, 10, 100],
                # "solver": ["newton-cg", "lbfgs", "liblinear", "sag", "saga"],
                # "max_iter": [100, 200, 500]
            },
            "SVC": {
                "C": [0.1, 1, 10, 100],
                # "kernel": ["linear", "poly", "rbf", "sigmoid"],
                # "gamma": ["scale", "auto"] + [0.001, 0.01, 0.1, 1],
                # "degree": [2, 3, 4]
            },
            "KNeighborsClassifier": {
                "n_neighbors": [3, 5, 7, 9, 11],
                # "weights": ["uniform", "distance"],
                # "p": [1, 2]
            },
            "DecisionTreeClassifier": {
                "max_depth": [None, 5, 10, 20, 30],
                # "min_samples_split": [2, 5, 10],
                # "min_samples_leaf": [1, 2, 4],
                # "criterion": ["gini", "entropy"]
            },
            "RandomForestClassifier": {
                "n_estimators": [50, 100, 200],
                # "max_depth": [None, 10, 20, 30],
                # "min_samples_split": [2, 5, 10],
                # "min_samples_leaf": [1, 2, 4],
                # "max_features": ["sqrt", "log2"]
            },
            "GradientBoostingClassifier": {
                "n_estimators": [50, 100, 200],
                # "learning_rate": [0.01, 0.1, 0.2],
                # "max_depth": [3, 5, 7],
                # "min_samples_split": [2, 5, 10],
                # "min_samples_leaf": [1, 2, 4],
                # "max_features": ["sqrt", "log2"],
                # "subsample": [0.8, 0.9, 1.0]
            },
            "XGBClassifier": {
                "n_estimators": [50, 100, 200],
                # "learning_rate": [0.01, 0.1, 0.2],
                # "max_depth": [3, 5, 7],
                # "min_child_weight": [1, 3, 5],
                # "subsample": [0.8, 0.9, 1.0],
                # "colsample_bytree": [0.8, 0.9, 1.0],
                # "gamma": [0, 0.1, 0.2],
                # "reg_alpha": [0, 0.1, 0.5],
                # "reg_lambda": [0, 0.1, 0.5]
            },
            "GaussianNB": {
                "var_smoothing": [1e-9, 1e-8, 1e-7]
            },

            "AdaBoostClassifier": {
                "n_estimators": [50, 100, 200],
                # "learning_rate": [0.01, 0.1, 0.5],
                # "base_estimator": [None, DecisionTreeClassifier(max_depth=1)]
            },
            "BaggingClassifier": {
                "n_estimators": [10, 50, 100],
                # "max_samples": [0.5, 0.7, 1.0],
                # "max_features": [0.5, 0.7, 1.0],
                # "base_estimator": [None, DecisionTreeClassifier(max_depth=2)]
            },
            "SGDClassifier": {
                "loss": ["hinge", "log_loss", "modified_huber"],
                # "penalty": ["l2", "l1", "elasticnet"],
                # "alpha": [0.0001, 0.001, 0.01],
                # "l1_ratio": [0, 0.15, 0.5, 1.0],
                # "max_iter": [1000, 2000],
                # "learning_rate": ["constant", "optimal", "invscaling"],
                # "eta0": [0.01, 0.1]
            }
        }

        # Now use models_dict and param_grids for model training and tuning
        model_report: dict=evaluate_models(X_train, y_train, X_test, y_test, models_dict, param_grids)

        # Get the best model
        best_model_score = max(sorted(model_report.values()))

        # Get the best model name
        best_model_name = list(model_report.keys())[
            list(model_report.values()).index(best_model_score)
        ]
        best_model = models_dict[best_model_name]

        # Model Threshould
        if best_model_score < 0.6:
            raise ("No best model found")
        logging.info("Best found model on both training and testing dataset")

        save_pkl("/content/best_model.pkl", best_model)
        print(best_model_score)
        print(best_model_name)
        print( model_report)


    except Exception as e:
        raise e


In [15]:
abc = initiate_model_trainer(train_arr, test_arr)
print(abc)

0.808
BaggingClassifier
{'LogisticRegression': 0.72, 'SVC': 0.732, 'KNeighborsClassifier': 0.708, 'DecisionTreeClassifier': 0.792, 'RandomForestClassifier': 0.728, 'GradientBoostingClassifier': 0.8, 'XGBClassifier': 0.784, 'GaussianNB': 0.268, 'AdaBoostClassifier': 0.752, 'BaggingClassifier': 0.808, 'SGDClassifier': 0.66}
None
