In [3]:
# regress_compare.py
# ---------------------------------------------------------
# Multivariate Linear Regression comparison:
# 1) Normal Equation
# 2) Batch Gradient Descent (BGD)
# 3) Stochastic Gradient Descent (SGD)
# 4) Mini-Batch Gradient Descent (MBGD)
#
# Output:
# - 1 grafik konvergensi (PNG)
# - 1 CSV berisi perbandingan waktu & MSE (test)
#
# Usage:
#   python3 regress_compare.py /path/to/student_exam_scores.csv
# ---------------------------------------------------------

import sys, os, time, math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

def main():
    if len(sys.argv) < 2:
        print("Usage: python3 regress_compare.py /path/to/student_exam_scores.csv")
        sys.exit(1)

    csv_path = sys.argv[1]
    if not os.path.exists(csv_path):
        raise FileNotFoundError(csv_path)

    # --------------
    # Load dataset
    # --------------
    df = pd.read_csv(csv_path)

    # --------------
    # Target detection
    # --------------
    cols = list(df.columns)
    target_candidates = [c for c in cols if c.lower() in
                         ["score","final_score","exam_score","final","target","y"]]
    y_col = target_candidates[0] if target_candidates else cols[-1]
    X_cols = [c for c in cols if c != y_col]

    X = df[X_cols].select_dtypes(include=[np.number]).copy()
    y = pd.to_numeric(df[y_col], errors="coerce")

    valid = ~(X.isna().any(axis=1) | y.isna())
    X = X.loc[valid].reset_index(drop=True)
    y = y.loc[valid].reset_index(drop=True)

    # --------------
    # Descriptive stats (optional print)
    # --------------
    print("\n[INFO] Features:", list(X.columns))
    print("[INFO] Target  :", y_col)
    print("\n[Descriptive] X head:\n", X.head())
    print("\n[Descriptive] y head:\n", y.head())

    # --------------
    # Train/Test split (80/20)
    # --------------
    rng = np.random.default_rng(42)
    idx = np.arange(len(X))
    rng.shuffle(idx)
    split = int(0.8*len(X))
    train_idx, test_idx = idx[:split], idx[split:]

    X_train, X_test = X.iloc[train_idx].copy(), X.iloc[test_idx].copy()
    y_train, y_test = y.iloc[train_idx].copy(), y.iloc[test_idx].copy()

    # --------------
    # Standardize features (fit on train)
    # --------------
    X_mean = X_train.mean()
    X_std = X_train.std(ddof=0).replace(0, 1.0)

    X_train_std = (X_train - X_mean) / X_std
    X_test_std  = (X_test  - X_mean) / X_std

    def add_bias(A: np.ndarray) -> np.ndarray:
        return np.c_[np.ones((A.shape[0], 1)), A]

    Xtr = add_bias(X_train_std.values)
    Xte = add_bias(X_test_std.values)
    ytr = y_train.values.reshape(-1, 1)
    yte = y_test.values.reshape(-1, 1)

    # Helpers
    def mse_cost(Xm, ym, theta):
        m = len(ym)
        err = Xm @ theta - ym
        return float((err.T @ err) / m)

    def grad(Xm, ym, theta):
        m = len(ym)
        return (2/m) * (Xm.T @ (Xm @ theta - ym))

    # 1) Normal Equation
    t0 = time.perf_counter()
    theta_ne = np.linalg.pinv(Xtr.T @ Xtr) @ Xtr.T @ ytr
    t_ne = time.perf_counter() - t0
    mse_ne = mse_cost(Xte, yte, theta_ne)

    # 2) BGD
    def batch_gradient_descent(Xm, ym, alpha=0.1, n_epochs=300):
        theta = np.zeros((Xm.shape[1], 1))
        history = []
        for _ in range(n_epochs):
            theta = theta - alpha * grad(Xm, ym, theta)
            history.append(mse_cost(Xm, ym, theta))
        return theta, history

    alphas = [0.001, 0.01, 0.05, 0.1, 0.2]
    best_alpha, best_hist, best_theta_bgd = None, None, None
    best_final_cost = math.inf
    t0 = time.perf_counter()
    for a in alphas:
        theta_try, hist_try = batch_gradient_descent(Xtr, ytr, alpha=a, n_epochs=300)
        if hist_try[-1] < best_final_cost:
            best_final_cost = hist_try[-1]
            best_alpha = a
            best_hist = hist_try
            best_theta_bgd = theta_try
    t_bgd = time.perf_counter() - t0
    mse_bgd = mse_cost(Xte, yte, best_theta_bgd)

    # 3) SGD
    def sgd(Xm, ym, n_epochs=50, t0_val=5.0, t1_val=50.0, shuffle=True):
        m, n = Xm.shape
        theta = np.zeros((n, 1))
        history = []
        iteration = 0
        rng_local = np.random.default_rng(123)

        def lr(t):
            return 1.0 / (t0_val + t1_val*t)

        for _ in range(n_epochs):
            indices = np.arange(m)
            if shuffle:
                rng_local.shuffle(indices)
            for i in indices:
                xi = Xm[i:i+1].T
                yi = ym[i:i+1]
                grad_i = 2 * (xi @ ((xi.T @ theta) - yi))
                eta = lr(iteration + 1)
                theta = theta - eta * grad_i
                iteration += 1
            history.append(mse_cost(Xm, ym, theta))
        return theta, history

    t0 = time.perf_counter()
    theta_sgd, hist_sgd = sgd(Xtr, ytr, n_epochs=50, t0_val=5.0, t1_val=50.0, shuffle=True)
    t_sgd = time.perf_counter() - t0
    mse_sgd = mse_cost(Xte, yte, theta_sgd)

    # 4) MBGD
    def mbgd(Xm, ym, batch_size=16, alpha=0.05, n_epochs=150, shuffle=True):
        m, n = Xm.shape
        theta = np.zeros((n, 1))
        history = []
        rng_local = np.random.default_rng(999)
        for _ in range(n_epochs):
            indices = np.arange(m)
            if shuffle:
                rng_local.shuffle(indices)
            for start in range(0, m, batch_size):
                end = min(start + batch_size, m)
                batch_idx = indices[start:end]
                Xb = Xm[batch_idx]
                yb = ym[batch_idx]
                theta = theta - alpha * grad(Xb, yb, theta)
            history.append(mse_cost(Xm, ym, theta))
        return theta, history

    batch_sizes = [8, 16, 32]
    alphas_mbgd = [0.01, 0.05, 0.1]
    best_cfg, best_hist_m, best_theta_m = None, None, None
    best_final_cost_m = math.inf
    t0 = time.perf_counter()
    for bs in batch_sizes:
        for a in alphas_mbgd:
            theta_try, hist_try = mbgd(Xtr, ytr, batch_size=bs, alpha=a, n_epochs=150)
            if hist_try[-1] < best_final_cost_m:
                best_final_cost_m = hist_try[-1]
                best_cfg = (bs, a)
                best_hist_m = hist_try
                best_theta_m = theta_try
    t_mbgd = time.perf_counter() - t0
    mse_mbgd = mse_cost(Xte, yte, best_theta_m)

    # --------------
    # Plot convergence (one chart)
    # --------------
    plt.figure()
    plt.plot(range(1, len(best_hist)+1), best_hist, label=f"BGD (alpha={best_alpha})")
    plt.plot(range(1, len(hist_sgd)+1), hist_sgd, label="SGD")
    plt.plot(range(1, len(best_hist_m)+1), best_hist_m,
             label=f"MBGD (bs={best_cfg[0]}, alpha={best_cfg[1]})")
    plt.xlabel("Epoch")
    plt.ylabel("J(θ) - MSE on Train")
    plt.title("Convergence of Gradient Descent Variants")
    plt.legend()

    # Simpan gambar
    plot_path = os.path.join(os.path.dirname(csv_path), "konvergensi_gd.png")
    plt.tight_layout()
    plt.savefig(plot_path, dpi=150)
    print(f"\n[Saved] Plot konvergensi: {plot_path}")
    plt.show()

    # --------------
    # Build comparison table
    # --------------
    results = pd.DataFrame([
        {"Metode":"Normal Equation", "Waktu Komputasi (detik)": t_ne,   "MSE (Test)": mse_ne},
        {"Metode":"Batch GD",        "Waktu Komputasi (detik)": t_bgd,  "MSE (Test)": mse_bgd},
        {"Metode":"SGD",             "Waktu Komputasi (detik)": t_sgd,  "MSE (Test)": mse_sgd},
        {"Metode":"Mini-Batch GD",   "Waktu Komputasi (detik)": t_mbgd, "MSE (Test)": mse_mbgd},
    ]).sort_values("MSE (Test)").reset_index(drop=True)

    out_csv = os.path.join(os.path.dirname(csv_path), "hasil_perbandingan_regresi.csv")
    results.to_csv(out_csv, index=False)
    print(f"[Saved] Tabel perbandingan: {out_csv}\n")
    print(results)

if __name__ == "__main__":
    main()


FileNotFoundError: --f=/run/user/1000/jupyter/runtime/kernel-v3354f02ffeeea6a339fced15295bc4273133eba82.json