<a href="https://colab.research.google.com/github/nullvoid-ky/introduction-to-machine-learning-and-deep-learning/blob/main/14_SMOTE_ALLMODEL_FROMSCRATCH_INIT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [22]:
# === อธิบาย: นำเข้าไลบรารีที่จำเป็นสำหรับการทำงานของโน้ตบุ๊ก (imports) ===
# ===== Setup & Installs (Kaggle usually has most of these; safe to re-run) =====
!pip -q install kagglehub

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import List
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.metrics import accuracy_score, roc_auc_score
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.datasets import make_classification
from imblearn.ensemble import BalancedBaggingClassifier
from sklearn.metrics import accuracy_score, classification_report
import warnings
warnings.filterwarnings('ignore')


In [23]:
# === อธิบาย: โค้ดบล็อกนี้ทำงานตามลำดับคำสั่งด้านล่าง เช่น เตรียมข้อมูล ประมวลผล หรือเทรน/ประเมินโมเดล ===
import kagglehub

# Download latest version
path = kagglehub.dataset_download("utkarshx27/american-companies-bankruptcy-prediction-dataset")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'american-companies-bankruptcy-prediction-dataset' dataset.
Path to dataset files: /kaggle/input/american-companies-bankruptcy-prediction-dataset


In [24]:
from kagglehub import KaggleDatasetAdapter, load_dataset
file_path = "/kaggle/input/american-companies-bankruptcy-prediction-dataset/american_bankruptcy.csv"
df = pd.read_csv(file_path)
print("Loaded shape:", df.shape)
print("Columns:\n", list(df.columns))
df.head()

Loaded shape: (78682, 21)
Columns:
 ['company_name', 'status_label', 'year', 'X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10', 'X11', 'X12', 'X13', 'X14', 'X15', 'X16', 'X17', 'X18']


Unnamed: 0,company_name,status_label,year,X1,X2,X3,X4,X5,X6,X7,...,X9,X10,X11,X12,X13,X14,X15,X16,X17,X18
0,C_1,alive,1999,511.267,833.107,18.373,89.031,336.018,35.163,128.348,...,1024.333,740.998,180.447,70.658,191.226,163.816,201.026,1024.333,401.483,935.302
1,C_1,alive,2000,485.856,713.811,18.577,64.367,320.59,18.531,115.187,...,874.255,701.854,179.987,45.79,160.444,125.392,204.065,874.255,361.642,809.888
2,C_1,alive,2001,436.656,526.477,22.496,27.207,286.588,-58.939,77.528,...,638.721,710.199,217.699,4.711,112.244,150.464,139.603,638.721,399.964,611.514
3,C_1,alive,2002,396.412,496.747,27.172,30.745,259.954,-12.41,66.322,...,606.337,686.621,164.658,3.573,109.59,203.575,124.106,606.337,391.633,575.592
4,C_1,alive,2003,432.204,523.302,26.68,47.491,247.245,3.504,104.661,...,651.958,709.292,248.666,20.811,128.656,131.261,131.884,651.958,407.608,604.467


In [25]:
FEATURES = ["X1","X2","X3","X4","X5","X6","X7","X8","X9","X11","X12","X13","X14","X15","X16","X17","X18","year"]
TARGET   = "status_label"
COMPANY  = "company_name"

In [26]:
missing = [c for c in FEATURES + [TARGET] if c not in df.columns]
if missing:
    raise ValueError(f"❌ Missing columns: {missing}")

def normalize_status(x):
    if pd.isna(x):
        return np.nan
    t = str(x).strip().lower()
    if t in {"alive", "non-bankrupt", "healthy"}:
        return 0
    if t in {"failed", "bankrupt"}:
        return 1
    if t in {"0", "1"}:
        return int(t)
    try:
        return int(float(t)) if float(t) in (0.0, 1.0) else np.nan
    except:
        return np.nan

y_norm = df[TARGET].apply(normalize_status)
bad_mask = y_norm.isna()
if bad_mask.any():
    print("⚠️ Unknown labels found, removing...")
    print(df.loc[bad_mask, TARGET].value_counts().head())
    df = df.loc[~bad_mask].copy()
    y_norm = y_norm.loc[~bad_mask]

df[TARGET] = y_norm.astype(int)


In [27]:

# ======================
# From-Scratch Metrics
# ======================
from typing import Dict, Tuple
import numpy as np
import matplotlib.pyplot as plt

def _ensure_binary(y: np.ndarray) -> np.ndarray:
    y = np.asarray(y).ravel()
    unique = np.unique(y)
    if set(unique.tolist()) == {0,1}:
        return y.astype(int)
    mapping = {v:i for i, v in enumerate(sorted(unique))}
    return np.vectorize(mapping.get)(y).astype(int)

def log_loss_from_scratch(y_true: np.ndarray, y_prob: np.ndarray, eps: float=1e-15) -> float:
    y = _ensure_binary(y_true)
    p = np.clip(np.asarray(y_prob).ravel(), eps, 1-eps)
    loss = -(y*np.log(p) + (1-y)*np.log(1-p)).mean()
    return float(loss)

def confusion_matrix_from_scratch(y_true: np.ndarray, y_pred: np.ndarray):
    y = _ensure_binary(y_true)
    yhat = _ensure_binary(y_pred)
    TP = int(((y==1) & (yhat==1)).sum())
    FP = int(((y==0) & (yhat==1)).sum())
    FN = int(((y==1) & (yhat==0)).sum())
    TN = int(((y==0) & (yhat==0)).sum())
    return TP, FP, FN, TN

def basic_scores_from_scratch(y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]:
    TP, FP, FN, TN = confusion_matrix_from_scratch(y_true, y_pred)
    total = TP + FP + FN + TN
    acc = (TP + TN) / total if total>0 else 0.0
    prec = TP / (TP + FP) if (TP+FP)>0 else 0.0
    rec = TP / (TP + FN) if (TP+FN)>0 else 0.0  # Sensitivity
    tnr = TN / (TN + FP) if (TN+FP)>0 else 0.0  # Specificity
    f1 = (2*prec*rec/(prec+rec)) if (prec+rec)>0 else 0.0
    return {
        "accuracy": acc,
        "precision": prec,
        "sensitivity_recall": rec,
        "specificity_tnr": tnr,
        "f1": f1,
        "TP": TP, "FP": FP, "FN": FN, "TN": TN
    }

def roc_curve_from_scratch(y_true: np.ndarray, y_prob: np.ndarray):
    y = _ensure_binary(y_true)
    p = np.asarray(y_prob).ravel()
    order = np.argsort(-p)
    y = y[order]
    p = p[order]
    thresholds = np.r_[np.inf, np.unique(p)][::-1]
    TPR = []
    FPR = []
    P = (y==1).sum()
    N = (y==0).sum()
    cum_pos = np.cumsum(y)
    cum_neg = np.cumsum(1 - y)
    for t in thresholds:
        idx = np.searchsorted(-p, -t, side="left")
        TP = int(cum_pos[idx-1]) if idx>0 else 0
        FP = int(cum_neg[idx-1]) if idx>0 else 0
        TPR.append(TP / P if P>0 else 0.0)
        FPR.append(FP / N if N>0 else 0.0)
    return np.array(FPR), np.array(TPR), thresholds

def auc_trapezoid(x: np.ndarray, y: np.ndarray) -> float:
    order = np.argsort(x)
    x = np.asarray(x).ravel()[order]
    y = np.asarray(y).ravel()[order]
    return float(np.trapz(y, x))

def evaluate_from_scratch(y_true, y_prob, threshold: float=0.5) -> Dict[str, float]:
    y_true = _ensure_binary(np.asarray(y_true))
    y_prob = np.asarray(y_prob).ravel()
    y_pred = (y_prob >= threshold).astype(int)
    scores = basic_scores_from_scratch(y_true, y_pred)
    scores["log_loss"] = log_loss_from_scratch(y_true, y_prob)
    fpr, tpr, _ = roc_curve_from_scratch(y_true, y_prob)
    scores["roc_auc"] = auc_trapezoid(fpr, tpr)
    return scores

def plot_roc_from_scratch(y_true, y_prob, title="ROC Curve (from scratch)"):
    fpr, tpr, _ = roc_curve_from_scratch(y_true, y_prob)
    auc = auc_trapezoid(fpr, tpr)
    plt.figure()
    plt.plot(fpr, tpr, label=f"AUC = {auc:.4f}")
    plt.plot([0,1], [0,1], linestyle="--")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate (Recall)")
    plt.title(title)
    plt.legend()
    plt.show()

def plot_threshold_curves(y_true, y_prob, title="Threshold vs Metrics (from scratch)"):
    y_true = _ensure_binary(y_true)
    probs = np.asarray(y_prob).ravel()
    thresholds = np.linspace(0,1,101)
    accs, precs, recs, tnrs, f1s, losses = [], [], [], [], [], []
    for t in thresholds:
        y_pred = (probs >= t).astype(int)
        scores = basic_scores_from_scratch(y_true, y_pred)
        accs.append(scores["accuracy"])
        precs.append(scores["precision"])
        recs.append(scores["sensitivity_recall"])
        tnrs.append(scores["specificity_tnr"])
        f1s.append(scores["f1"])
        losses.append(log_loss_from_scratch(y_true, probs))
    plt.figure()
    plt.plot(thresholds, accs, label="Accuracy")
    plt.plot(thresholds, precs, label="Precision")
    plt.plot(thresholds, recs, label="Recall (Sensitivity)")
    plt.plot(thresholds, tnrs, label="Specificity (TNR)")
    plt.plot(thresholds, f1s, label="F1-score")
    plt.plot(thresholds, losses, label="Log Loss")
    plt.xlabel("Threshold")
    plt.ylabel("Score")
    plt.title(title + " — (Non-iterative models: ใช้กราฟเทียบ threshold แทน loss ต่อ epoch)")
    plt.legend()
    plt.show()

print("✅ From-scratch metric toolkit is ready. Use evaluate_from_scratch(y_true, y_prob).")


✅ From-scratch metric toolkit is ready. Use evaluate_from_scratch(y_true, y_prob).


In [28]:

from dataclasses import dataclass
from typing import Any, Optional, Dict
import numpy as np
from imblearn.ensemble import BalancedBaggingClassifier

@dataclass
class ImbalancePipeline:
    model: Any = None
    sampler: Any = None
    preprocess: Any = None
    random_state: int = 42

    def __post_init__(self):
        if self.model is None:
            self.model = RandomForestClassifier(
                n_estimators=200, class_weight="balanced_subsample",
                random_state=self.random_state, n_jobs=-1
            )

    def fit(self, X, y):
        X = np.asarray(X)
        y = np.asarray(y)
        if self.preprocess is not None:
            X = self.preprocess.fit_transform(X, y)
        if self.sampler is not None:
            X, y = self.sampler.fit_resample(X, y)
        self.model.fit(X, y)
        self._is_fitted = True
        return self

    def predict_proba(self, X):
        X = np.asarray(X)
        if self.preprocess is not None:
            X = self.preprocess.transform(X)
        if hasattr(self.model, "predict_proba"):
            proba = self.model.predict_proba(X)
            if proba.shape[1] == 2:
                return proba[:, 1]
            return proba[:, -1]
        if hasattr(self.model, "decision_function"):
            z = self.model.decision_function(X)
            return 1 / (1 + np.exp(-z))
        return self.predict(X).astype(float)

    def predict(self, X, threshold: float=0.5):
        probs = self.predict_proba(X)
        return (probs >= threshold).astype(int)

    def evaluate(self, X, y_true, threshold: float=0.5) -> Dict[str, float]:
        y_true = np.asarray(y_true)
        probs = self.predict_proba(X)
        scores = evaluate_from_scratch(y_true, probs, threshold=threshold)
        return scores

    def plot_roc(self, X, y_true, title: str="ROC (Pipeline)"):
        probs = self.predict_proba(X)
        plot_roc_from_scratch(y_true, probs, title=title)

    def plot_threshold_curves(self, X, y_true, title: str="Threshold vs Metrics (Pipeline)"):
        probs = self.predict_proba(X)
        plot_threshold_curves(y_true, probs, title=title)



In [29]:
# ==============================
# Feature selection (X, y) + map target
# ==============================

FEATURES = FEATURES
TARGET   = "status_label"

missing = [c for c in FEATURES + [TARGET] if c not in df.columns]
if missing:
    raise ValueError(f"❌ Missing columns in df: {missing}")

# Make sure the target column is integer type
df[TARGET] = df[TARGET].astype(int)

X = df[FEATURES].copy()
y = df[TARGET].copy()

print("✅ X,y ready.")
print("X shape:", X.shape, "| y counts:", dict(pd.Series(y).value_counts()))

✅ X,y ready.
X shape: (78682, 18) | y counts: {0: np.int64(73462), 1: np.int64(5220)}


In [30]:
from ml_from_scratch import (
    LogisticRegressionModel, DecisionTreeModel, RandomForestModel,
    NaiveBayesModel, SupportVectorMachineModel, PerceptronModel,
    MLPModel, ReducedClassifierModel, KMeanClustering,
    AgglomerativeClusteringModel
)

# example params (tweak as you like)
lr_params   = {"lr": 0.1, "n_iter": 2000}
dt_params   = {"max_depth": 6}
rf_params   = {"n_estimators": 60, "max_depth": 6, "feature_ratio": 0.7}
svm_params  = {"lambda": 1e-3, "n_iter": 3000}
perc_params = {"lr": 0.1, "n_iter": 1000}
mlp_params  = {"hidden": 32, "lr": 0.01, "n_iter": 1500}

